blob: f4efa3aec024be90f3441e79315537bf24f19979 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/sink/vdata_stream_sender.h"
#include <fmt/format.h>
#include <fmt/ranges.h> // IWYU pragma: keep
#include <gen_cpp/DataSinks_types.h>
#include <gen_cpp/Metrics_types.h>
#include <gen_cpp/Types_types.h>
#include <gen_cpp/data.pb.h>
#include <gen_cpp/internal_service.pb.h>
#include <glog/logging.h>
#include <stddef.h>
#include <algorithm>
#include <cstdint>
#include <functional>
#include <map>
#include <memory>
#include <random>
#include "common/object_pool.h"
#include "common/status.h"
#include "exec/tablet_info.h"
#include "pipeline/exec/exchange_sink_operator.h"
#include "pipeline/exec/result_file_sink_operator.h"
#include "runtime/descriptors.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "runtime/types.h"
#include "util/proto_util.h"
#include "vec/columns/column_const.h"
#include "vec/columns/columns_number.h"
#include "vec/common/sip_hash.h"
#include "vec/exprs/vexpr.h"
#include "vec/runtime/vdata_stream_mgr.h"
#include "vec/runtime/vdata_stream_recvr.h"
#include "vec/sink/vrow_distribution.h"
#include "vec/sink/writer/vtablet_writer_v2.h"
namespace doris::vectorized {
template <typename Parent>
Status Channel<Parent>::init(RuntimeState* state) {
_be_number = state->be_number();
if (_brpc_dest_addr.hostname.empty()) {
LOG(WARNING) << "there is no brpc destination address's hostname"
", maybe version is not compatible.";
return Status::InternalError("no brpc destination");
}
_brpc_request = std::make_shared<PTransmitDataParams>();
// initialize brpc request
_brpc_request->mutable_finst_id()->set_hi(_fragment_instance_id.hi);
_brpc_request->mutable_finst_id()->set_lo(_fragment_instance_id.lo);
_finst_id = _brpc_request->finst_id();
_brpc_request->mutable_query_id()->set_hi(state->query_id().hi);
_brpc_request->mutable_query_id()->set_lo(state->query_id().lo);
_query_id = _brpc_request->query_id();
_brpc_request->set_node_id(_dest_node_id);
_brpc_request->set_sender_id(_parent->sender_id());
_brpc_request->set_be_number(_be_number);
_brpc_timeout_ms = std::min(3600, state->execution_timeout()) * 1000;
if (state->query_options().__isset.enable_local_exchange) {
_is_local &= state->query_options().enable_local_exchange;
}
if (_is_local) {
WARN_IF_ERROR(_parent->state()->exec_env()->vstream_mgr()->find_recvr(
_fragment_instance_id, _dest_node_id, &_local_recvr),
"");
} else {
auto network_address = _brpc_dest_addr;
if (_brpc_dest_addr.hostname == BackendOptions::get_localhost()) {
_brpc_stub = state->exec_env()->brpc_internal_client_cache()->get_client(
"127.0.0.1", _brpc_dest_addr.port);
if (config::enable_brpc_connection_check) {
network_address.hostname = "127.0.0.1";
}
} else {
_brpc_stub =
state->exec_env()->brpc_internal_client_cache()->get_client(_brpc_dest_addr);
}
if (!_brpc_stub) {
std::string msg = fmt::format("Get rpc stub failed, dest_addr={}:{}",
_brpc_dest_addr.hostname, _brpc_dest_addr.port);
LOG(WARNING) << msg;
return Status::InternalError(msg);
}
if (config::enable_brpc_connection_check) {
state->get_query_ctx()->add_using_brpc_stub(network_address, _brpc_stub);
}
}
_serializer.set_is_local(_is_local);
// In bucket shuffle join will set fragment_instance_id (-1, -1)
// to build a camouflaged empty channel. the ip and port is '0.0.0.0:0"
// so the empty channel not need call function close_internal()
_need_close = (_fragment_instance_id.hi != -1 && _fragment_instance_id.lo != -1);
_state = state;
return Status::OK();
}
template <typename Parent>
Status Channel<Parent>::init_stub(RuntimeState* state) {
if (_brpc_dest_addr.hostname.empty()) {
LOG(WARNING) << "there is no brpc destination address's hostname"
", maybe version is not compatible.";
return Status::InternalError("no brpc destination");
}
if (state->query_options().__isset.enable_local_exchange) {
_is_local &= state->query_options().enable_local_exchange;
}
if (_is_local) {
return Status::OK();
}
auto network_address = _brpc_dest_addr;
if (_brpc_dest_addr.hostname == BackendOptions::get_localhost()) {
_brpc_stub = state->exec_env()->brpc_internal_client_cache()->get_client(
"127.0.0.1", _brpc_dest_addr.port);
if (config::enable_brpc_connection_check) {
network_address.hostname = "127.0.0.1";
}
} else {
_brpc_stub = state->exec_env()->brpc_internal_client_cache()->get_client(_brpc_dest_addr);
}
if (!_brpc_stub) {
std::string msg = fmt::format("Get rpc stub failed, dest_addr={}:{}",
_brpc_dest_addr.hostname, _brpc_dest_addr.port);
LOG(WARNING) << msg;
return Status::InternalError(msg);
}
if (config::enable_brpc_connection_check) {
state->get_query_ctx()->add_using_brpc_stub(network_address, _brpc_stub);
}
return Status::OK();
}
template <typename Parent>
Status Channel<Parent>::open(RuntimeState* state) {
if (_is_local) {
WARN_IF_ERROR(_parent->state()->exec_env()->vstream_mgr()->find_recvr(
_fragment_instance_id, _dest_node_id, &_local_recvr),
"");
}
_be_number = state->be_number();
_brpc_request = std::make_shared<PTransmitDataParams>();
// initialize brpc request
_brpc_request->mutable_finst_id()->set_hi(_fragment_instance_id.hi);
_brpc_request->mutable_finst_id()->set_lo(_fragment_instance_id.lo);
_finst_id = _brpc_request->finst_id();
_brpc_request->mutable_query_id()->set_hi(state->query_id().hi);
_brpc_request->mutable_query_id()->set_lo(state->query_id().lo);
_query_id = _brpc_request->query_id();
_brpc_request->set_node_id(_dest_node_id);
_brpc_request->set_sender_id(_parent->sender_id());
_brpc_request->set_be_number(_be_number);
_brpc_timeout_ms = std::min(3600, state->execution_timeout()) * 1000;
_serializer.set_is_local(_is_local);
// In bucket shuffle join will set fragment_instance_id (-1, -1)
// to build a camouflaged empty channel. the ip and port is '0.0.0.0:0"
// so the empty channel not need call function close_internal()
_need_close = (_fragment_instance_id.hi != -1 && _fragment_instance_id.lo != -1);
_state = state;
return Status::OK();
}
template <typename Parent>
std::shared_ptr<pipeline::Dependency> PipChannel<Parent>::get_local_channel_dependency() {
if (!Channel<Parent>::_local_recvr) {
if constexpr (std::is_same_v<pipeline::ExchangeSinkLocalState, Parent>) {
return nullptr;
} else {
throw Exception(ErrorCode::INTERNAL_ERROR, "_local_recvr is null");
}
}
return Channel<Parent>::_local_recvr->get_local_channel_dependency(
Channel<Parent>::_parent->sender_id());
}
template <typename Parent>
Status Channel<Parent>::send_current_block(bool eos, Status exec_status) {
// FIXME: Now, local exchange will cause the performance problem is in a multi-threaded scenario
// so this feature is turned off here by default. We need to re-examine this logic
if (is_local()) {
return send_local_block(exec_status, eos);
}
SCOPED_CONSUME_MEM_TRACKER(_parent->mem_tracker());
if (eos) {
RETURN_IF_ERROR(_serializer.serialize_block(_ch_cur_pb_block, 1));
}
RETURN_IF_ERROR(send_remote_block(_ch_cur_pb_block, eos, exec_status));
ch_roll_pb_block();
return Status::OK();
}
template <typename Parent>
Status Channel<Parent>::send_local_block(Status exec_status, bool eos) {
SCOPED_TIMER(_parent->local_send_timer());
Block block = _serializer.get_block()->to_block();
_serializer.get_block()->set_mutable_columns(block.clone_empty_columns());
if (_recvr_is_valid()) {
if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) {
COUNTER_UPDATE(_parent->local_bytes_send_counter(), block.bytes());
COUNTER_UPDATE(_parent->local_sent_rows(), block.rows());
COUNTER_UPDATE(_parent->blocks_sent_counter(), 1);
}
_local_recvr->add_block(&block, _parent->sender_id(), true);
if (eos) {
_local_recvr->remove_sender(_parent->sender_id(), _be_number, exec_status);
}
return Status::OK();
} else {
_serializer.reset_block();
return _receiver_status;
}
}
template <typename Parent>
Status Channel<Parent>::send_local_block(Block* block, bool can_be_moved) {
SCOPED_TIMER(_parent->local_send_timer());
if (_recvr_is_valid()) {
if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) {
COUNTER_UPDATE(_parent->local_bytes_send_counter(), block->bytes());
COUNTER_UPDATE(_parent->local_sent_rows(), block->rows());
COUNTER_UPDATE(_parent->blocks_sent_counter(), 1);
}
_local_recvr->add_block(block, _parent->sender_id(), can_be_moved);
return Status::OK();
} else {
return _receiver_status;
}
}
template <typename Parent>
Status Channel<Parent>::send_remote_block(PBlock* block, bool eos, Status exec_status) {
if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) {
COUNTER_UPDATE(_parent->blocks_sent_counter(), 1);
}
SCOPED_TIMER(_parent->brpc_send_timer());
if (_send_remote_block_callback == nullptr) {
_send_remote_block_callback = DummyBrpcCallback<PTransmitDataResult>::create_shared();
} else {
RETURN_IF_ERROR(_wait_last_brpc());
_send_remote_block_callback->cntl_->Reset();
}
VLOG_ROW << "Channel<Parent>::send_batch() instance_id=" << print_id(_fragment_instance_id)
<< " dest_node=" << _dest_node_id << " to_host=" << _brpc_dest_addr.hostname
<< " _packet_seq=" << _packet_seq << " row_desc=" << _row_desc.debug_string();
_brpc_request->set_eos(eos);
if (!exec_status.ok()) {
exec_status.to_protobuf(_brpc_request->mutable_exec_status());
}
if (block != nullptr) {
_brpc_request->set_allocated_block(block);
}
_brpc_request->set_packet_seq(_packet_seq++);
_send_remote_block_callback->cntl_->set_timeout_ms(_brpc_timeout_ms);
if (config::exchange_sink_ignore_eovercrowded) {
_send_remote_block_callback->cntl_->ignore_eovercrowded();
}
{
auto send_remote_block_closure =
AutoReleaseClosure<PTransmitDataParams, DummyBrpcCallback<PTransmitDataResult>>::
create_unique(_brpc_request, _send_remote_block_callback);
if (enable_http_send_block(*_brpc_request)) {
RETURN_IF_ERROR(transmit_block_httpv2(
_state->exec_env(), std::move(send_remote_block_closure), _brpc_dest_addr));
} else {
transmit_blockv2(*_brpc_stub, std::move(send_remote_block_closure));
}
}
if (block != nullptr) {
static_cast<void>(_brpc_request->release_block());
}
return Status::OK();
}
template <typename Parent>
Status Channel<Parent>::add_rows(Block* block, const std::vector<uint32_t>& rows, bool eos) {
if (_fragment_instance_id.lo == -1) {
return Status::OK();
}
bool serialized = false;
RETURN_IF_ERROR(
_serializer.next_serialized_block(block, _ch_cur_pb_block, 1, &serialized, eos, &rows));
if (serialized) {
RETURN_IF_ERROR(send_current_block(false, Status::OK()));
}
return Status::OK();
}
template <typename Parent>
Status Channel<Parent>::close_wait(RuntimeState* state) {
if (_need_close) {
Status st = _wait_last_brpc();
if (st.is<ErrorCode::END_OF_FILE>()) {
st = Status::OK();
} else if (!st.ok()) {
state->log_error(st.to_string());
}
_need_close = false;
return st;
}
_serializer.reset_block();
return Status::OK();
}
template <typename Parent>
Status Channel<Parent>::close_internal(Status exec_status) {
if (!_need_close) {
return Status::OK();
}
VLOG_RPC << "Channel::close_internal() instance_id=" << print_id(_fragment_instance_id)
<< " dest_node=" << _dest_node_id << " #rows= "
<< ((_serializer.get_block() == nullptr) ? 0 : _serializer.get_block()->rows())
<< " receiver status: " << _receiver_status << ", exec_status: " << exec_status;
if (is_receiver_eof()) {
_serializer.reset_block();
return Status::OK();
}
Status status;
if (_serializer.get_block() != nullptr && _serializer.get_block()->rows() > 0) {
status = send_current_block(true, exec_status);
} else {
SCOPED_CONSUME_MEM_TRACKER(_parent->mem_tracker());
if (is_local()) {
if (_recvr_is_valid()) {
_local_recvr->remove_sender(_parent->sender_id(), _be_number, exec_status);
}
} else {
// Non pipeline engine will send an empty eos block
status = send_remote_block((PBlock*)nullptr, true, exec_status);
}
}
// Don't wait for the last packet to finish, left it to close_wait.
if (status.is<ErrorCode::END_OF_FILE>()) {
return Status::OK();
} else {
return status;
}
}
template <typename Parent>
Status Channel<Parent>::close(RuntimeState* state, Status exec_status) {
if (_closed) {
return Status::OK();
}
_closed = true;
Status st = close_internal(exec_status);
if (!st.ok()) {
state->log_error(st.to_string());
}
return st;
}
template <typename Parent>
void Channel<Parent>::ch_roll_pb_block() {
_ch_cur_pb_block = (_ch_cur_pb_block == &_ch_pb_block1 ? &_ch_pb_block2 : &_ch_pb_block1);
}
VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int sender_id,
const RowDescriptor& row_desc, const TDataStreamSink& sink,
const std::vector<TPlanFragmentDestination>& destinations)
: DataSink(row_desc),
_sender_id(sender_id),
_state(state),
_pool(pool),
_current_channel_idx(0),
_part_type(sink.output_partition.type),
_dest_node_id(sink.dest_node_id),
_transfer_large_data_by_brpc(config::transfer_large_data_by_brpc),
_serializer(this) {
DCHECK_GT(destinations.size(), 0);
DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED ||
sink.output_partition.type == TPartitionType::HASH_PARTITIONED ||
sink.output_partition.type == TPartitionType::RANDOM ||
sink.output_partition.type == TPartitionType::RANGE_PARTITIONED ||
sink.output_partition.type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED ||
sink.output_partition.type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED);
std::map<int64_t, int64_t> fragment_id_to_channel_index;
_enable_pipeline_exec = state->enable_pipeline_exec();
for (int i = 0; i < destinations.size(); ++i) {
const auto& fragment_instance_id = destinations[i].fragment_instance_id;
if (fragment_id_to_channel_index.find(fragment_instance_id.lo) ==
fragment_id_to_channel_index.end()) {
if (_enable_pipeline_exec) {
_channel_shared_ptrs.emplace_back(new PipChannel<VDataStreamSender>(
this, row_desc, destinations[i].brpc_server, fragment_instance_id,
sink.dest_node_id));
} else {
_channel_shared_ptrs.emplace_back(
new Channel(this, row_desc, destinations[i].brpc_server,
fragment_instance_id, sink.dest_node_id));
}
fragment_id_to_channel_index.emplace(fragment_instance_id.lo,
_channel_shared_ptrs.size() - 1);
_channels.push_back(_channel_shared_ptrs.back().get());
} else {
_channel_shared_ptrs.emplace_back(
_channel_shared_ptrs[fragment_id_to_channel_index[fragment_instance_id.lo]]);
}
}
_name = "VDataStreamSender";
if (_enable_pipeline_exec) {
_broadcast_pb_blocks = vectorized::BroadcastPBlockHolderQueue::create_shared();
for (int i = 0; i < config::num_broadcast_buffer; ++i) {
_broadcast_pb_blocks->push(vectorized::BroadcastPBlockHolder::create_shared());
}
} else {
_cur_pb_block = &_pb_block1;
}
}
VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int sender_id,
const RowDescriptor& row_desc, PlanNodeId dest_node_id,
const std::vector<TPlanFragmentDestination>& destinations)
: DataSink(row_desc),
_sender_id(sender_id),
_state(state),
_pool(pool),
_current_channel_idx(0),
_part_type(TPartitionType::UNPARTITIONED),
_dest_node_id(dest_node_id),
_serializer(this) {
_cur_pb_block = &_pb_block1;
_name = "VDataStreamSender";
std::map<int64_t, int64_t> fragment_id_to_channel_index;
for (int i = 0; i < destinations.size(); ++i) {
const auto& fragment_instance_id = destinations[i].fragment_instance_id;
if (fragment_id_to_channel_index.find(fragment_instance_id.lo) ==
fragment_id_to_channel_index.end()) {
_channel_shared_ptrs.emplace_back(new Channel(this, row_desc,
destinations[i].brpc_server,
fragment_instance_id, _dest_node_id));
}
fragment_id_to_channel_index.emplace(fragment_instance_id.lo,
_channel_shared_ptrs.size() - 1);
_channels.push_back(_channel_shared_ptrs.back().get());
}
}
VDataStreamSender::~VDataStreamSender() {
_channel_shared_ptrs.clear();
}
Status VDataStreamSender::init(const TDataSink& tsink) {
RETURN_IF_ERROR(DataSink::init(tsink));
const TDataStreamSink& t_stream_sink = tsink.stream_sink;
if (_part_type == TPartitionType::HASH_PARTITIONED) {
_partition_count = _channels.size();
_partitioner.reset(new XXHashPartitioner<ShuffleChannelIds>(_channels.size()));
RETURN_IF_ERROR(_partitioner->init(t_stream_sink.output_partition.partition_exprs));
} else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
_partition_count = _channel_shared_ptrs.size();
_partitioner.reset(
new Crc32HashPartitioner<ShuffleChannelIds>(_channel_shared_ptrs.size()));
RETURN_IF_ERROR(_partitioner->init(t_stream_sink.output_partition.partition_exprs));
} else if (_part_type == TPartitionType::RANGE_PARTITIONED) {
return Status::InternalError("TPartitionType::RANGE_PARTITIONED should not be used");
} else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
_txn_id = t_stream_sink.tablet_sink_txn_id;
_schema = std::make_shared<OlapTableSchemaParam>();
RETURN_IF_ERROR(_schema->init(t_stream_sink.tablet_sink_schema));
_vpartition = std::make_unique<VOlapTablePartitionParam>(
_schema, t_stream_sink.tablet_sink_partition);
RETURN_IF_ERROR(_vpartition->init());
auto find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_ROW;
_tablet_finder = std::make_unique<OlapTabletFinder>(_vpartition.get(), find_tablet_mode);
_tablet_sink_tuple_desc =
_state->desc_tbl().get_tuple_descriptor(t_stream_sink.tablet_sink_tuple_id);
_tablet_sink_row_desc = _pool->add(new RowDescriptor(_tablet_sink_tuple_desc, false));
//_block_convertor no need init_autoinc_info here
_block_convertor =
std::make_unique<vectorized::OlapTableBlockConvertor>(_tablet_sink_tuple_desc);
_location = _pool->add(new OlapTableLocationParam(t_stream_sink.tablet_sink_location));
_row_distribution.init({.state = _state,
.block_convertor = _block_convertor.get(),
.tablet_finder = _tablet_finder.get(),
.vpartition = _vpartition.get(),
.add_partition_request_timer = _add_partition_request_timer,
.txn_id = _txn_id,
.pool = _pool,
.location = _location,
.vec_output_expr_ctxs = &_fake_expr_ctxs,
.schema = _schema,
.caller = (void*)this,
.create_partition_callback = &empty_callback_function});
} else {
// UNPARTITIONED
}
return Status::OK();
}
Status VDataStreamSender::prepare(RuntimeState* state) {
RETURN_IF_ERROR(DataSink::prepare(state));
std::vector<std::string> instances;
for (const auto& channel : _channels) {
instances.emplace_back(channel->get_fragment_instance_id_str());
}
std::string title = fmt::format("VDataStreamSender (dst_id={}, dst_fragments=[{}])",
_dest_node_id, instances);
_profile = _pool->add(new RuntimeProfile(title));
init_sink_common_profile();
SCOPED_TIMER(_profile->total_time_counter());
_mem_tracker = std::make_unique<MemTracker>("VDataStreamSender:" +
print_id(state->fragment_instance_id()));
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM) {
std::random_device rd;
std::mt19937 g(rd());
shuffle(_channels.begin(), _channels.end(), g);
} else if (_part_type == TPartitionType::HASH_PARTITIONED ||
_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
RETURN_IF_ERROR(_partitioner->prepare(state, _row_desc));
if (_part_type == TPartitionType::HASH_PARTITIONED) {
_profile->add_info_string("Partitioner",
fmt::format("XXHashPartitioner({})", _partition_count));
} else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
_profile->add_info_string("Partitioner",
fmt::format("Crc32HashPartitioner({})", _partition_count));
}
}
_bytes_sent_counter = ADD_COUNTER(profile(), "BytesSent", TUnit::BYTES);
_uncompressed_bytes_counter = ADD_COUNTER(profile(), "UncompressedRowBatchSize", TUnit::BYTES);
_local_sent_rows = ADD_COUNTER(profile(), "LocalSentRows", TUnit::UNIT);
_serialize_batch_timer = ADD_TIMER(profile(), "SerializeBatchTime");
_compress_timer = ADD_TIMER(profile(), "CompressTime");
_brpc_send_timer = ADD_TIMER(profile(), "BrpcSendTime");
_brpc_wait_timer = ADD_TIMER(profile(), "BrpcSendTime.Wait");
_local_send_timer = ADD_TIMER(profile(), "LocalSendTime");
_split_block_hash_compute_timer = ADD_TIMER(profile(), "SplitBlockHashComputeTime");
_split_block_distribute_by_channel_timer =
ADD_TIMER(profile(), "SplitBlockDistributeByChannelTime");
_merge_block_timer = ADD_TIMER(profile(), "MergeBlockTime");
_overall_throughput = profile()->add_derived_counter(
"OverallThroughput", TUnit::BYTES_PER_SECOND,
std::bind<int64_t>(&RuntimeProfile::units_per_second, _bytes_sent_counter,
profile()->total_time_counter()),
"");
_local_bytes_send_counter = ADD_COUNTER(profile(), "LocalBytesSent", TUnit::BYTES);
_memory_usage_counter = ADD_LABEL_COUNTER(profile(), "MemoryUsage");
_peak_memory_usage_counter =
profile()->AddHighWaterMarkCounter("PeakMemoryUsage", TUnit::BYTES, "MemoryUsage");
return Status::OK();
}
Status VDataStreamSender::open(RuntimeState* state) {
DCHECK(state != nullptr);
int local_size = 0;
for (int i = 0; i < _channels.size(); ++i) {
RETURN_IF_ERROR(_channels[i]->init(state));
if (_channels[i]->is_local()) {
local_size++;
}
}
_only_local_exchange = local_size == _channels.size();
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
if (_part_type == TPartitionType::HASH_PARTITIONED ||
_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
RETURN_IF_ERROR(_partitioner->open(state));
} else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
RETURN_IF_ERROR(_row_distribution.open(_tablet_sink_row_desc));
}
_compression_type = state->fragement_transmission_compression_type();
return Status::OK();
}
template <typename ChannelPtrType>
void VDataStreamSender::_handle_eof_channel(RuntimeState* state, ChannelPtrType channel,
Status st) {
channel->set_receiver_eof(st);
// Chanel will not send RPC to the downstream when eof, so close chanel by OK status.
static_cast<void>(channel->close(state, Status::OK()));
}
Status VDataStreamSender::_send_new_partition_batch() {
if (_row_distribution.need_deal_batching()) { // maybe try_close more than 1 time
RETURN_IF_ERROR(_row_distribution.automatic_create_partition());
Block tmp_block = _row_distribution._batching_block->to_block(); // Borrow out, for lval ref
// these order is only.
// 1. clear batching stats(and flag goes true) so that we won't make a new batching process in dealing batched block.
// 2. deal batched block
// 3. now reuse the column of lval block. cuz write doesn't real adjust it. it generate a new block from that.
_row_distribution.clear_batching_stats();
RETURN_IF_ERROR(this->send(_state, &tmp_block, false));
// Recovery back
_row_distribution._batching_block->set_mutable_columns(tmp_block.mutate_columns());
_row_distribution._batching_block->clear_column_data();
_row_distribution._deal_batched = false;
}
return Status::OK();
}
Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
SCOPED_TIMER(_profile->total_time_counter());
SCOPED_TIMER(_exec_timer);
COUNTER_UPDATE(_output_rows_counter, block->rows());
_peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
bool all_receiver_eof = true;
for (auto channel : _channels) {
if (!channel->is_receiver_eof()) {
all_receiver_eof = false;
break;
}
}
if (all_receiver_eof) {
return Status::EndOfFile("all data stream channels EOF");
}
if (_part_type == TPartitionType::UNPARTITIONED || _channels.size() == 1) {
// 1. serialize depends on it is not local exchange
// 2. send block
// 3. rollover block
if (_only_local_exchange) {
if (!block->empty()) {
Status status;
for (auto channel : _channels) {
if (!channel->is_receiver_eof()) {
status = channel->send_local_block(block, false);
HANDLE_CHANNEL_STATUS(state, channel, status);
}
}
}
} else if (_enable_pipeline_exec) {
std::shared_ptr<BroadcastPBlockHolder> block_holder = nullptr;
RETURN_IF_ERROR(_get_next_available_buffer(&block_holder));
{
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
bool serialized = false;
RETURN_IF_ERROR(_serializer.next_serialized_block(
block, block_holder->get_block(), _channels.size(), &serialized, eos));
if (serialized) {
auto cur_block = _serializer.get_block()->to_block();
if (!cur_block.empty()) {
RETURN_IF_ERROR(_serializer.serialize_block(
&cur_block, block_holder->get_block(), _channels.size()));
} else {
block_holder->get_block()->Clear();
}
Status status;
for (auto channel : _channels) {
if (!channel->is_receiver_eof()) {
if (channel->is_local()) {
status = channel->send_local_block(&cur_block, false);
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
status = channel->send_broadcast_block(block_holder, eos);
}
HANDLE_CHANNEL_STATUS(state, channel, status);
}
}
cur_block.clear_column_data();
_serializer.get_block()->set_mutable_columns(cur_block.mutate_columns());
}
}
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
bool serialized = false;
RETURN_IF_ERROR(_serializer.next_serialized_block(
block, _cur_pb_block, _channels.size(), &serialized, false));
if (serialized) {
auto cur_block = _serializer.get_block()->to_block();
if (!cur_block.empty()) {
RETURN_IF_ERROR(_serializer.serialize_block(&cur_block, _cur_pb_block,
_channels.size()));
}
Status status;
for (auto channel : _channels) {
if (!channel->is_receiver_eof()) {
if (channel->is_local()) {
status = channel->send_local_block(&cur_block, false);
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
status = channel->send_remote_block(_cur_pb_block, false);
}
HANDLE_CHANNEL_STATUS(state, channel, status);
}
}
cur_block.clear_column_data();
_serializer.get_block()->set_mutable_columns(cur_block.mutate_columns());
_roll_pb_block();
}
}
} else if (_part_type == TPartitionType::RANDOM) {
// 1. select channel
Channel<VDataStreamSender>* current_channel = _channels[_current_channel_idx];
if (!current_channel->is_receiver_eof()) {
// 2. serialize, send and rollover block
if (current_channel->is_local()) {
auto status = current_channel->send_local_block(block, false);
HANDLE_CHANNEL_STATUS(state, current_channel, status);
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
RETURN_IF_ERROR(
_serializer.serialize_block(block, current_channel->ch_cur_pb_block()));
auto status =
current_channel->send_remote_block(current_channel->ch_cur_pb_block(), eos);
HANDLE_CHANNEL_STATUS(state, current_channel, status);
current_channel->ch_roll_pb_block();
}
}
_current_channel_idx = (_current_channel_idx + 1) % _channels.size();
} else if (_part_type == TPartitionType::HASH_PARTITIONED ||
_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
auto rows = block->rows();
{
SCOPED_TIMER(_split_block_hash_compute_timer);
RETURN_IF_ERROR(_partitioner->do_partitioning(state, block, _mem_tracker.get()));
}
if (_part_type == TPartitionType::HASH_PARTITIONED) {
RETURN_IF_ERROR(channel_add_rows(state, _channels, _partition_count,
_partitioner->get_channel_ids().get<uint64_t>(), rows,
block, _enable_pipeline_exec ? eos : false));
} else {
RETURN_IF_ERROR(channel_add_rows(state, _channel_shared_ptrs, _partition_count,
_partitioner->get_channel_ids().get<uint32_t>(), rows,
block, _enable_pipeline_exec ? eos : false));
}
} else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
// check out of limit
RETURN_IF_ERROR(_send_new_partition_batch());
std::shared_ptr<vectorized::Block> convert_block = std::make_shared<vectorized::Block>();
const auto& num_channels = _channels.size();
std::vector<std::vector<uint32>> channel2rows;
channel2rows.resize(num_channels);
auto input_rows = block->rows();
if (input_rows > 0) {
bool has_filtered_rows = false;
int64_t filtered_rows = 0;
_number_input_rows += input_rows;
RETURN_IF_ERROR(_row_distribution.generate_rows_distribution(
*block, convert_block, filtered_rows, has_filtered_rows, _row_part_tablet_ids,
_number_input_rows));
const auto& row_ids = _row_part_tablet_ids[0].row_ids;
const auto& tablet_ids = _row_part_tablet_ids[0].tablet_ids;
for (int idx = 0; idx < row_ids.size(); ++idx) {
const auto& row = row_ids[idx];
const auto& tablet_id_hash =
HashUtil::zlib_crc_hash(&tablet_ids[idx], sizeof(int64), 0);
channel2rows[tablet_id_hash % num_channels].emplace_back(row);
}
}
if (eos) {
_row_distribution._deal_batched = true;
RETURN_IF_ERROR(_send_new_partition_batch());
}
RETURN_IF_ERROR(channel_add_rows_with_idx(state, _channels, num_channels, channel2rows,
convert_block.get(),
_enable_pipeline_exec ? eos : false));
} else {
// Range partition
// 1. calculate range
// 2. dispatch rows to channel
}
// If eos == true, then this is the last block, should close the channel in this step.
Status final_st = Status::OK();
// For non-pipeline engine, there maybe an block in serializer, should wait for
if (eos && _enable_pipeline_exec) {
_serializer.reset_block();
for (int i = 0; i < _channels.size(); ++i) {
// For non-pipeline engine, this API maybe hang to wait last rpc.
// For pipeline engine, it will add block to exchange sink buffer,
// and then come into pending finish state.
Status st = _channels[i]->close(state, Status::OK());
if (!st.ok() && final_st.ok()) {
final_st = st;
}
}
}
return final_st;
}
Status VDataStreamSender::close(RuntimeState* state, Status exec_status) {
SCOPED_TIMER(_exec_timer);
if (_closed) {
return Status::OK();
}
Status final_st = Status::OK();
if (!state->enable_pipeline_exec()) {
{
// send last block
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
// non pipeline engin not pass eos in send function, and maybe have create partition at last block
// so at here to check again
if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
_row_distribution._deal_batched = true;
RETURN_IF_ERROR(_send_new_partition_batch());
}
if (_serializer.get_block() && _serializer.get_block()->rows() > 0) {
Block block = _serializer.get_block()->to_block();
RETURN_IF_ERROR(
_serializer.serialize_block(&block, _cur_pb_block, _channels.size()));
Status status;
for (auto channel : _channels) {
if (!channel->is_receiver_eof()) {
if (channel->is_local()) {
status = channel->send_local_block(&block, false);
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
status = channel->send_remote_block(_cur_pb_block, false);
}
HANDLE_CHANNEL_STATUS(state, channel, status);
}
}
}
}
for (int i = 0; i < _channels.size(); ++i) {
Status st = _channels[i]->close(state, exec_status);
if (!st.ok() && final_st.ok()) {
final_st = st;
}
}
// wait all channels to finish
for (int i = 0; i < _channels.size(); ++i) {
Status st = _channels[i]->close_wait(state);
if (!st.ok() && final_st.ok()) {
final_st = st;
}
}
}
if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
_state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() +
_tablet_finder->num_filtered_rows());
_state->update_num_rows_load_unselected(
_tablet_finder->num_immutable_partition_filtered_rows());
}
if (_peak_memory_usage_counter) {
_peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
}
RETURN_IF_ERROR(DataSink::close(state, exec_status));
return final_st;
}
template <typename Parent>
BlockSerializer<Parent>::BlockSerializer(Parent* parent, bool is_local)
: _parent(parent), _is_local(is_local), _batch_size(parent->state()->batch_size()) {}
template <typename Parent>
Status BlockSerializer<Parent>::next_serialized_block(Block* block, PBlock* dest, int num_receivers,
bool* serialized, bool eos,
const std::vector<uint32_t>* rows) {
if (_mutable_block == nullptr) {
SCOPED_CONSUME_MEM_TRACKER(_parent->mem_tracker());
_mutable_block = MutableBlock::create_unique(block->clone_empty());
}
{
SCOPED_CONSUME_MEM_TRACKER(_parent->mem_tracker());
if (rows) {
if (!rows->empty()) {
SCOPED_TIMER(_parent->split_block_distribute_by_channel_timer());
const auto* begin = rows->data();
RETURN_IF_ERROR(_mutable_block->add_rows(block, begin, begin + rows->size()));
}
} else if (!block->empty()) {
SCOPED_TIMER(_parent->merge_block_timer());
RETURN_IF_ERROR(_mutable_block->merge(*block));
}
}
if (_mutable_block->rows() >= _batch_size || eos) {
if (!_is_local) {
RETURN_IF_ERROR(serialize_block(dest, num_receivers));
}
*serialized = true;
return Status::OK();
}
*serialized = false;
return Status::OK();
}
template <typename Parent>
Status BlockSerializer<Parent>::serialize_block(PBlock* dest, int num_receivers) {
if (_mutable_block && _mutable_block->rows() > 0) {
auto block = _mutable_block->to_block();
RETURN_IF_ERROR(serialize_block(&block, dest, num_receivers));
block.clear_column_data();
_mutable_block->set_mutable_columns(block.mutate_columns());
}
return Status::OK();
}
template <typename Parent>
Status BlockSerializer<Parent>::serialize_block(const Block* src, PBlock* dest, int num_receivers) {
if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) {
SCOPED_TIMER(_parent->_serialize_batch_timer);
dest->Clear();
size_t uncompressed_bytes = 0, compressed_bytes = 0;
RETURN_IF_ERROR(src->serialize(
_parent->_state->be_exec_version(), dest, &uncompressed_bytes, &compressed_bytes,
_parent->compression_type(), _parent->transfer_large_data_by_brpc()));
COUNTER_UPDATE(_parent->_bytes_sent_counter, compressed_bytes * num_receivers);
COUNTER_UPDATE(_parent->_uncompressed_bytes_counter, uncompressed_bytes * num_receivers);
COUNTER_UPDATE(_parent->_compress_timer, src->get_compress_time());
_parent->get_query_statistics_ptr()->add_shuffle_send_bytes(compressed_bytes *
num_receivers);
_parent->get_query_statistics_ptr()->add_shuffle_send_rows(src->rows() * num_receivers);
}
return Status::OK();
}
void VDataStreamSender::_roll_pb_block() {
_cur_pb_block = (_cur_pb_block == &_pb_block1 ? &_pb_block2 : &_pb_block1);
}
Status VDataStreamSender::_get_next_available_buffer(
std::shared_ptr<BroadcastPBlockHolder>* holder) {
if (_broadcast_pb_blocks->empty()) {
return Status::InternalError("No broadcast buffer left!");
} else {
*holder = _broadcast_pb_blocks->pop();
return Status::OK();
}
}
void VDataStreamSender::register_pipeline_channels(
pipeline::ExchangeSinkBuffer<VDataStreamSender>* buffer) {
for (auto channel : _channels) {
((PipChannel<VDataStreamSender>*)channel)->register_exchange_buffer(buffer);
}
}
bool VDataStreamSender::channel_all_can_write() {
if ((_part_type == TPartitionType::UNPARTITIONED || _channels.size() == 1) &&
!_only_local_exchange) {
// This condition means we need use broadcast buffer, so we should make sure
// there are available buffer before running pipeline
return !_broadcast_pb_blocks->empty();
} else {
for (auto channel : _channels) {
if (!channel->can_write()) {
return false;
}
}
return true;
}
}
template class Channel<pipeline::ExchangeSinkLocalState>;
template class Channel<VDataStreamSender>;
template class PipChannel<pipeline::ExchangeSinkLocalState>;
template class PipChannel<VDataStreamSender>;
template class Channel<pipeline::ResultFileSinkLocalState>;
template class BlockSerializer<pipeline::ResultFileSinkLocalState>;
template class BlockSerializer<pipeline::ExchangeSinkLocalState>;
template class BlockSerializer<VDataStreamSender>;
} // namespace doris::vectorized