| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "exchange_sink_operator.h" |
| |
| #include <gen_cpp/DataSinks_types.h> |
| #include <gen_cpp/Partitions_types.h> |
| #include <gen_cpp/Types_types.h> |
| #include <gen_cpp/types.pb.h> |
| |
| #include <memory> |
| #include <mutex> |
| #include <random> |
| #include <string> |
| |
| #include "common/status.h" |
| #include "exchange_sink_buffer.h" |
| #include "pipeline/dependency.h" |
| #include "pipeline/exec/operator.h" |
| #include "pipeline/exec/sort_source_operator.h" |
| #include "pipeline/local_exchange/local_exchange_sink_operator.h" |
| #include "pipeline/pipeline_fragment_context.h" |
| #include "util/runtime_profile.h" |
| #include "util/uid_util.h" |
| #include "vec/columns/column_const.h" |
| #include "vec/exprs/vexpr.h" |
| #include "vec/sink/tablet_sink_hash_partitioner.h" |
| |
| namespace doris::pipeline { |
| #include "common/compile_check_begin.h" |
| bool ExchangeSinkLocalState::transfer_large_data_by_brpc() const { |
| return _parent->cast<ExchangeSinkOperatorX>()._transfer_large_data_by_brpc; |
| } |
| |
| static const std::string timer_name = "WaitForDependencyTime"; |
| |
| Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { |
| RETURN_IF_ERROR(Base::init(state, info)); |
| SCOPED_TIMER(exec_time_counter()); |
| SCOPED_TIMER(_init_timer); |
| _sender_id = info.sender_id; |
| |
| _bytes_sent_counter = ADD_COUNTER(custom_profile(), "BytesSent", TUnit::BYTES); |
| _uncompressed_bytes_counter = |
| ADD_COUNTER(custom_profile(), "UncompressedRowBatchSize", TUnit::BYTES); |
| _local_sent_rows = ADD_COUNTER(custom_profile(), "LocalSentRows", TUnit::UNIT); |
| _serialize_batch_timer = ADD_TIMER(custom_profile(), "SerializeBatchTime"); |
| _compress_timer = ADD_TIMER(custom_profile(), "CompressTime"); |
| _local_send_timer = ADD_TIMER(custom_profile(), "LocalSendTime"); |
| _split_block_hash_compute_timer = ADD_TIMER(custom_profile(), "SplitBlockHashComputeTime"); |
| _distribute_rows_into_channels_timer = |
| ADD_TIMER(custom_profile(), "DistributeRowsIntoChannelsTime"); |
| _send_new_partition_timer = ADD_TIMER(custom_profile(), "SendNewPartitionTime"); |
| _blocks_sent_counter = |
| ADD_COUNTER_WITH_LEVEL(custom_profile(), "BlocksProduced", TUnit::UNIT, 1); |
| _overall_throughput = custom_profile()->add_derived_counter( |
| "OverallThroughput", TUnit::BYTES_PER_SECOND, |
| [this]() { |
| return RuntimeProfile::units_per_second(_bytes_sent_counter, |
| custom_profile()->total_time_counter()); |
| }, |
| ""); |
| _merge_block_timer = ADD_TIMER(custom_profile(), "MergeBlockTime"); |
| _local_bytes_send_counter = ADD_COUNTER(custom_profile(), "LocalBytesSent", TUnit::BYTES); |
| _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(common_profile(), timer_name, 1); |
| _wait_queue_timer = |
| ADD_CHILD_TIMER_WITH_LEVEL(common_profile(), "WaitForRpcBufferQueue", timer_name, 1); |
| |
| _create_channels(); |
| // Make sure brpc stub is ready before execution. |
| for (auto& channel : channels) { |
| RETURN_IF_ERROR(channel->init(state)); |
| } |
| _wait_broadcast_buffer_timer = |
| ADD_CHILD_TIMER(common_profile(), "WaitForBroadcastBuffer", timer_name); |
| |
| auto& p = _parent->cast<ExchangeSinkOperatorX>(); |
| _part_type = p._part_type; |
| // Shuffle the channels randomly |
| if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM || |
| _part_type == TPartitionType::HIVE_TABLE_SINK_UNPARTITIONED) { |
| std::random_device rd; |
| std::mt19937 g(rd()); |
| shuffle(channels.begin(), channels.end(), g); |
| } |
| |
| size_t local_size = 0; |
| for (int i = 0; i < channels.size(); ++i) { |
| if (channels[i]->is_local()) { |
| local_channel_ids.push_back(i); |
| local_size++; |
| _last_local_channel_idx = i; |
| } |
| } |
| _only_local_exchange = local_size == channels.size(); |
| _rpc_channels_num = channels.size() - local_size; |
| |
| if (!_only_local_exchange) { |
| _sink_buffer = p.get_sink_buffer(state, state->fragment_instance_id().lo); |
| register_channels(_sink_buffer.get()); |
| _queue_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), |
| "ExchangeSinkQueueDependency", true); |
| _sink_buffer->set_dependency(state->fragment_instance_id().lo, _queue_dependency, this); |
| } |
| |
| if (_part_type == TPartitionType::HASH_PARTITIONED) { |
| _partition_count = channels.size(); |
| if (_state->query_options().__isset.enable_new_shuffle_hash_method && |
| _state->query_options().enable_new_shuffle_hash_method) { |
| _partitioner = std::make_unique<vectorized::Crc32CHashPartitioner>(channels.size()); |
| } else { |
| _partitioner = std::make_unique< |
| vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>( |
| channels.size()); |
| } |
| RETURN_IF_ERROR(_partitioner->init(p._texprs)); |
| RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc)); |
| custom_profile()->add_info_string( |
| "Partitioner", fmt::format("Crc32CHashPartitioner({})", _partition_count)); |
| } else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) { |
| _partition_count = channels.size(); |
| _partitioner = |
| std::make_unique<vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>( |
| channels.size()); |
| RETURN_IF_ERROR(_partitioner->init(p._texprs)); |
| RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc)); |
| custom_profile()->add_info_string( |
| "Partitioner", fmt::format("Crc32HashPartitioner({})", _partition_count)); |
| } else if (_part_type == TPartitionType::OLAP_TABLE_SINK_HASH_PARTITIONED) { |
| _partition_count = channels.size(); |
| custom_profile()->add_info_string( |
| "Partitioner", fmt::format("TabletSinkHashPartitioner({})", _partition_count)); |
| _partitioner = std::make_unique<vectorized::TabletSinkHashPartitioner>( |
| _partition_count, p._tablet_sink_txn_id, p._tablet_sink_schema, |
| p._tablet_sink_partition, p._tablet_sink_location, p._tablet_sink_tuple_id, this); |
| RETURN_IF_ERROR(_partitioner->init({})); |
| RETURN_IF_ERROR(_partitioner->prepare(state, {})); |
| } else if (_part_type == TPartitionType::HIVE_TABLE_SINK_HASH_PARTITIONED) { |
| _partition_count = |
| channels.size() * config::table_sink_partition_write_max_partition_nums_per_writer; |
| _partitioner = std::make_unique<vectorized::ScaleWriterPartitioner>( |
| channels.size(), _partition_count, channels.size(), 1, |
| config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold / |
| state->task_num() == |
| 0 |
| ? config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold |
| : config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold / |
| state->task_num(), |
| config::table_sink_partition_write_min_data_processed_rebalance_threshold / |
| state->task_num() == |
| 0 |
| ? config::table_sink_partition_write_min_data_processed_rebalance_threshold |
| : config::table_sink_partition_write_min_data_processed_rebalance_threshold / |
| state->task_num()); |
| |
| RETURN_IF_ERROR(_partitioner->init(p._texprs)); |
| RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc)); |
| custom_profile()->add_info_string( |
| "Partitioner", fmt::format("ScaleWriterPartitioner({})", _partition_count)); |
| } |
| |
| return Status::OK(); |
| } |
| |
| void ExchangeSinkLocalState::_create_channels() { |
| auto& p = _parent->cast<ExchangeSinkOperatorX>(); |
| std::map<int64_t, int64_t> fragment_id_to_channel_index; |
| for (int i = 0; i < p._dests.size(); ++i) { |
| const auto& fragment_instance_id = p._dests[i].fragment_instance_id; |
| if (!fragment_id_to_channel_index.contains(fragment_instance_id.lo)) { |
| channels.push_back(std::make_shared<vectorized::Channel>( |
| this, p._dests[i].brpc_server, fragment_instance_id, p._dest_node_id)); |
| fragment_id_to_channel_index.emplace(fragment_instance_id.lo, channels.size() - 1); |
| |
| if (fragment_instance_id.hi != -1 && fragment_instance_id.lo != -1) { |
| _working_channels_count++; |
| } |
| } else { |
| channels.emplace_back(channels[fragment_id_to_channel_index[fragment_instance_id.lo]]); |
| } |
| } |
| } |
| |
| void ExchangeSinkLocalState::on_channel_finished(InstanceLoId channel_id) { |
| std::lock_guard<std::mutex> lock(_finished_channels_mutex); |
| |
| if (_finished_channels.contains(channel_id)) { |
| LOG(WARNING) << "Query: " << print_id(_state->query_id()) |
| << ", on_channel_finished on already finished channel: " << channel_id; |
| return; |
| } else { |
| _finished_channels.emplace(channel_id); |
| if (_working_channels_count.fetch_sub(1) == 1) { |
| set_reach_limit(); |
| if (_finish_dependency) { |
| _finish_dependency->set_ready(); |
| } |
| } |
| } |
| } |
| |
| Status ExchangeSinkLocalState::open(RuntimeState* state) { |
| SCOPED_TIMER(exec_time_counter()); |
| SCOPED_TIMER(_open_timer); |
| RETURN_IF_ERROR(Base::open(state)); |
| _writer = std::make_unique<Writer>(); |
| |
| for (auto& channel : channels) { |
| RETURN_IF_ERROR(channel->open(state)); |
| } |
| |
| PUniqueId id; |
| id.set_hi(_state->query_id().hi); |
| id.set_lo(_state->query_id().lo); |
| |
| if ((_part_type == TPartitionType::UNPARTITIONED) && !_only_local_exchange) { |
| _broadcast_pb_mem_limiter = |
| vectorized::BroadcastPBlockHolderMemLimiter::create_shared(_queue_dependency); |
| } else if (_last_local_channel_idx > -1) { |
| for (auto& channel : channels) { |
| if (channel->is_local()) { |
| if (auto dep = channel->get_local_channel_dependency()) { |
| if (dep == nullptr) { |
| return Status::InternalError("local channel dependency is nullptr"); |
| } |
| _local_channels_dependency.push_back(dep); |
| _wait_channel_timer.push_back(common_profile()->add_nonzero_counter( |
| fmt::format("WaitForLocalExchangeBuffer{}", |
| _local_channels_dependency.size()), |
| TUnit ::TIME_NS, timer_name, 1)); |
| } |
| } |
| } |
| } |
| |
| if (_part_type == TPartitionType::HASH_PARTITIONED || |
| _part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED || |
| _part_type == TPartitionType::HIVE_TABLE_SINK_HASH_PARTITIONED || |
| _part_type == TPartitionType::OLAP_TABLE_SINK_HASH_PARTITIONED) { |
| RETURN_IF_ERROR(_partitioner->open(state)); |
| } |
| return Status::OK(); |
| } |
| |
| std::string ExchangeSinkLocalState::name_suffix() { |
| auto& p = _parent->cast<ExchangeSinkOperatorX>(); |
| return fmt::format(exchange_sink_name_suffix, std::to_string(p._dest_node_id)); |
| } |
| |
| segment_v2::CompressionTypePB ExchangeSinkLocalState::compression_type() const { |
| return _parent->cast<ExchangeSinkOperatorX>()._compression_type; |
| } |
| |
| ExchangeSinkOperatorX::ExchangeSinkOperatorX( |
| RuntimeState* state, const RowDescriptor& row_desc, int operator_id, |
| const TDataStreamSink& sink, const std::vector<TPlanFragmentDestination>& destinations, |
| const std::vector<TUniqueId>& fragment_instance_ids) |
| : DataSinkOperatorX(operator_id, sink.dest_node_id, std::numeric_limits<int>::max()), |
| _texprs(sink.output_partition.partition_exprs), |
| _row_desc(row_desc), |
| _part_type(sink.output_partition.type), |
| _dests(destinations), |
| _dest_node_id(sink.dest_node_id), |
| _transfer_large_data_by_brpc(config::transfer_large_data_by_brpc), |
| _tablet_sink_schema(sink.tablet_sink_schema), |
| _tablet_sink_partition(sink.tablet_sink_partition), |
| _tablet_sink_location(sink.tablet_sink_location), |
| _tablet_sink_tuple_id(sink.tablet_sink_tuple_id), |
| _tablet_sink_txn_id(sink.tablet_sink_txn_id), |
| _t_tablet_sink_exprs(&sink.tablet_sink_exprs), |
| _dest_is_merge(sink.__isset.is_merge && sink.is_merge), |
| _fragment_instance_ids(fragment_instance_ids) { |
| #ifndef BE_TEST |
| DCHECK_GT(destinations.size(), 0); |
| DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED || |
| sink.output_partition.type == TPartitionType::HASH_PARTITIONED || |
| sink.output_partition.type == TPartitionType::RANDOM || |
| sink.output_partition.type == TPartitionType::RANGE_PARTITIONED || |
| sink.output_partition.type == TPartitionType::OLAP_TABLE_SINK_HASH_PARTITIONED || |
| sink.output_partition.type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED || |
| sink.output_partition.type == TPartitionType::HIVE_TABLE_SINK_HASH_PARTITIONED || |
| sink.output_partition.type == TPartitionType::HIVE_TABLE_SINK_UNPARTITIONED); |
| #endif |
| _name = "ExchangeSinkOperatorX"; |
| _pool = std::make_shared<ObjectPool>(); |
| if (sink.__isset.output_tuple_id) { |
| _output_tuple_id = sink.output_tuple_id; |
| } |
| |
| if (_part_type != TPartitionType::UNPARTITIONED) { |
| // if the destinations only one dest, we need to use broadcast |
| std::unordered_set<UniqueId> dest_fragment_ids_set; |
| for (const auto& dest : _dests) { |
| dest_fragment_ids_set.insert(dest.fragment_instance_id); |
| if (dest_fragment_ids_set.size() > 1) { |
| break; |
| } |
| } |
| _part_type = dest_fragment_ids_set.size() == 1 ? TPartitionType::RANDOM : _part_type; |
| } |
| } |
| |
| Status ExchangeSinkOperatorX::init(const TDataSink& tsink) { |
| RETURN_IF_ERROR(DataSinkOperatorX::init(tsink)); |
| if (_part_type == TPartitionType::RANGE_PARTITIONED) { |
| return Status::InternalError("TPartitionType::RANGE_PARTITIONED should not be used"); |
| } |
| if (_part_type == TPartitionType::OLAP_TABLE_SINK_HASH_PARTITIONED) { |
| RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(*_t_tablet_sink_exprs, |
| _tablet_sink_expr_ctxs)); |
| } |
| return Status::OK(); |
| } |
| |
| Status ExchangeSinkOperatorX::prepare(RuntimeState* state) { |
| RETURN_IF_ERROR(DataSinkOperatorX<ExchangeSinkLocalState>::prepare(state)); |
| _state = state; |
| _compression_type = state->fragement_transmission_compression_type(); |
| if (_part_type == TPartitionType::OLAP_TABLE_SINK_HASH_PARTITIONED) { |
| if (_output_tuple_id == -1) { |
| RETURN_IF_ERROR( |
| vectorized::VExpr::prepare(_tablet_sink_expr_ctxs, state, _child->row_desc())); |
| } else { |
| auto* output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id); |
| auto* output_row_desc = _pool->add(new RowDescriptor(output_tuple_desc)); |
| RETURN_IF_ERROR( |
| vectorized::VExpr::prepare(_tablet_sink_expr_ctxs, state, *output_row_desc)); |
| } |
| RETURN_IF_ERROR(vectorized::VExpr::open(_tablet_sink_expr_ctxs, state)); |
| } |
| |
| _init_sink_buffer(); |
| return Status::OK(); |
| } |
| |
| void ExchangeSinkOperatorX::_init_sink_buffer() { |
| std::vector<InstanceLoId> ins_ids; |
| for (auto fragment_instance_id : _fragment_instance_ids) { |
| ins_ids.push_back(fragment_instance_id.lo); |
| } |
| _sink_buffer = _create_buffer(_state, ins_ids); |
| } |
| |
| template <typename ChannelPtrType> |
| Status ExchangeSinkOperatorX::_handle_eof_channel(RuntimeState* state, ChannelPtrType channel, |
| Status st) { |
| channel->set_receiver_eof(st); |
| // Chanel will not send RPC to the downstream when eof, so close chanel by OK status. |
| return channel->close(state); |
| } |
| |
| Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block, bool eos) { |
| auto& local_state = get_local_state(state); |
| COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)block->rows()); |
| SCOPED_TIMER(local_state.exec_time_counter()); |
| bool all_receiver_eof = true; |
| for (auto& channel : local_state.channels) { |
| if (!channel->is_receiver_eof()) { |
| all_receiver_eof = false; |
| break; |
| } |
| } |
| if (all_receiver_eof) { |
| return Status::EndOfFile("all data stream channels EOF"); |
| } |
| |
| if (local_state.low_memory_mode()) { |
| set_low_memory_mode(state); |
| } |
| |
| if (block->empty() && !eos) { |
| return Status::OK(); |
| } |
| |
| auto get_serializer_mem_bytes = [&local_state]() -> int64_t { |
| int64_t mem_usage = local_state._serializer.mem_usage(); |
| for (auto& channel : local_state.channels) { |
| mem_usage += channel->mem_usage(); |
| } |
| return mem_usage; |
| }; |
| |
| int64_t before_serializer_mem_bytes = get_serializer_mem_bytes(); |
| |
| auto send_to_current_channel = [&]() -> Status { |
| // 1. select channel |
| auto& current_channel = local_state.channels[local_state.current_channel_idx]; |
| if (!current_channel->is_receiver_eof()) { |
| // 2. serialize, send and rollover block |
| if (current_channel->is_local()) { |
| auto status = current_channel->send_local_block(block, eos, true); |
| HANDLE_CHANNEL_STATUS(state, current_channel, status); |
| } else { |
| auto pblock = std::make_unique<PBlock>(); |
| if (!block->empty()) { |
| RETURN_IF_ERROR(local_state._serializer.serialize_block(block, pblock.get())); |
| } |
| auto status = current_channel->send_remote_block(std::move(pblock), eos); |
| HANDLE_CHANNEL_STATUS(state, current_channel, status); |
| } |
| } |
| return Status::OK(); |
| }; |
| |
| if (_part_type == TPartitionType::UNPARTITIONED) { |
| // 1. serialize depends on it is not local exchange |
| // 2. send block |
| // 3. rollover block |
| if (local_state._only_local_exchange) { |
| Status status; |
| size_t idx = 0; |
| for (auto& channel : local_state.channels) { |
| if (!channel->is_receiver_eof()) { |
| // If this channel is the last, we can move this block to downstream pipeline. |
| // Otherwise, this block also need to be broadcasted to other channels so should be copied. |
| DCHECK_GE(local_state._last_local_channel_idx, 0); |
| status = channel->send_local_block(block, eos, |
| idx == local_state._last_local_channel_idx); |
| HANDLE_CHANNEL_STATUS(state, channel, status); |
| } |
| idx++; |
| } |
| } else { |
| auto block_holder = vectorized::BroadcastPBlockHolder::create_shared(); |
| { |
| bool serialized = false; |
| RETURN_IF_ERROR(local_state._serializer.next_serialized_block( |
| block, block_holder->get_block(), local_state._rpc_channels_num, |
| &serialized, eos)); |
| if (serialized) { |
| auto cur_block = local_state._serializer.get_block()->to_block(); |
| if (!cur_block.empty()) { |
| DCHECK(eos || local_state._serializer.is_local()) << debug_string(state, 0); |
| RETURN_IF_ERROR(local_state._serializer.serialize_block( |
| &cur_block, block_holder->get_block(), |
| local_state._rpc_channels_num)); |
| } else { |
| block_holder->reset_block(); |
| } |
| |
| local_state._broadcast_pb_mem_limiter->acquire(*block_holder); |
| |
| size_t idx = 0; |
| bool moved = false; |
| for (auto& channel : local_state.channels) { |
| if (!channel->is_receiver_eof()) { |
| Status status; |
| if (channel->is_local()) { |
| // If this channel is the last, we can move this block to downstream pipeline. |
| // Otherwise, this block also need to be broadcasted to other channels so should be copied. |
| DCHECK_GE(local_state._last_local_channel_idx, 0); |
| status = channel->send_local_block( |
| &cur_block, eos, |
| idx == local_state._last_local_channel_idx); |
| moved = idx == local_state._last_local_channel_idx; |
| } else { |
| status = channel->send_broadcast_block(block_holder, eos); |
| } |
| HANDLE_CHANNEL_STATUS(state, channel, status); |
| } |
| idx++; |
| } |
| if (moved || state->low_memory_mode()) { |
| local_state._serializer.reset_block(); |
| } else { |
| cur_block.clear_column_data(); |
| local_state._serializer.get_block()->set_mutable_columns( |
| cur_block.mutate_columns()); |
| } |
| } |
| } |
| } |
| } else if (_part_type == TPartitionType::RANDOM) { |
| if (!local_state.local_channel_ids.empty()) { |
| const auto& ids = local_state.local_channel_ids; |
| // Find the first channel ID >= current_channel_idx |
| auto it = std::lower_bound(ids.begin(), ids.end(), local_state.current_channel_idx); |
| if (it != ids.end()) { |
| local_state.current_channel_idx = *it; |
| } else { |
| // All IDs are < current_channel_idx; wrap around to the first one |
| local_state.current_channel_idx = ids[0]; |
| } |
| } |
| RETURN_IF_ERROR(send_to_current_channel()); |
| local_state.current_channel_idx = |
| (local_state.current_channel_idx + 1) % local_state.channels.size(); |
| } else if (_part_type == TPartitionType::HASH_PARTITIONED || |
| _part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED || |
| _part_type == TPartitionType::OLAP_TABLE_SINK_HASH_PARTITIONED || |
| _part_type == TPartitionType::HIVE_TABLE_SINK_HASH_PARTITIONED) { |
| RETURN_IF_ERROR(local_state._writer->write(&local_state, state, block, eos)); |
| } else if (_part_type == TPartitionType::HIVE_TABLE_SINK_UNPARTITIONED) { |
| // Control the number of channels according to the flow, thereby controlling the number of table sink writers. |
| RETURN_IF_ERROR(send_to_current_channel()); |
| _data_processed += block->bytes(); |
| if (_writer_count < local_state.channels.size()) { |
| if (_data_processed >= |
| _writer_count * |
| config::table_sink_non_partition_write_scaling_data_processed_threshold) { |
| _writer_count++; |
| } |
| } |
| local_state.current_channel_idx = (local_state.current_channel_idx + 1) % _writer_count; |
| } else { |
| // Range partition |
| // 1. calculate range |
| // 2. dispatch rows to channel |
| } |
| |
| int64_t after_serializer_mem_bytes = get_serializer_mem_bytes(); |
| |
| int64_t delta_mem_bytes = after_serializer_mem_bytes - before_serializer_mem_bytes; |
| COUNTER_UPDATE(local_state.memory_used_counter(), delta_mem_bytes); |
| |
| Status final_st = Status::OK(); |
| if (eos) { |
| COUNTER_UPDATE(local_state.memory_used_counter(), -after_serializer_mem_bytes); |
| local_state._serializer.reset_block(); |
| for (auto& channel : local_state.channels) { |
| Status st = channel->close(state); |
| /** |
| * Consider this case below: |
| * |
| * +--- Channel0 (Running) |
| * | |
| * ExchangeSink ---+--- Channel1 (EOF) |
| * | |
| * +--- Channel2 (Running) |
| * |
| * Channel1 is EOF now and return `END_OF_FILE` here. However, Channel0 and Channel2 |
| * still need new data. If ExchangeSink returns EOF, downstream tasks will no longer receive |
| * blocks including EOS signal. So we must ensure to return EOF iff all channels are EOF. |
| */ |
| if (!st.ok() && !st.is<ErrorCode::END_OF_FILE>() && final_st.ok()) { |
| final_st = st; |
| } |
| } |
| } |
| return final_st; |
| } |
| |
| void ExchangeSinkLocalState::register_channels(pipeline::ExchangeSinkBuffer* buffer) { |
| for (auto& channel : channels) { |
| channel->set_exchange_buffer(buffer); |
| } |
| } |
| |
| std::string ExchangeSinkLocalState::debug_string(int indentation_level) const { |
| fmt::memory_buffer debug_string_buffer; |
| fmt::format_to(debug_string_buffer, "{}", Base::debug_string(indentation_level)); |
| if (_sink_buffer) { |
| fmt::format_to(debug_string_buffer, |
| ", Sink Buffer: (_is_finishing = {}, blocks in queue: {}, queue capacity: " |
| "{}, queue dep: {}), _reach_limit: {}, working channels: {}, total " |
| "channels: {}, remote channels: {}, each queue size: {}", |
| _sink_buffer->_is_failed.load(), _sink_buffer->_total_queue_size, |
| _sink_buffer->_queue_capacity, (void*)_queue_dependency.get(), |
| _reach_limit.load(), _working_channels_count.load(), channels.size(), |
| _rpc_channels_num, _sink_buffer->debug_each_instance_queue_size()); |
| } |
| return fmt::to_string(debug_string_buffer); |
| } |
| |
| Status ExchangeSinkLocalState::close(RuntimeState* state, Status exec_status) { |
| if (_closed) { |
| return Status::OK(); |
| } |
| SCOPED_TIMER(exec_time_counter()); |
| if (_partitioner) { |
| RETURN_IF_ERROR(_partitioner->close(state)); |
| } |
| SCOPED_TIMER(_close_timer); |
| if (_queue_dependency) { |
| COUNTER_UPDATE(_wait_queue_timer, _queue_dependency->watcher_elapse_time()); |
| } |
| |
| COUNTER_SET(_wait_for_finish_dependency_timer, _finish_dependency->watcher_elapse_time()); |
| for (size_t i = 0; i < _local_channels_dependency.size(); i++) { |
| COUNTER_UPDATE(_wait_channel_timer[i], |
| _local_channels_dependency[i]->watcher_elapse_time()); |
| } |
| if (_sink_buffer) { |
| _sink_buffer->update_profile(custom_profile()); |
| _sink_buffer->close(); |
| } |
| return Base::close(state, exec_status); |
| } |
| |
| std::shared_ptr<ExchangeSinkBuffer> ExchangeSinkOperatorX::_create_buffer( |
| RuntimeState* state, const std::vector<InstanceLoId>& sender_ins_ids) { |
| PUniqueId id; |
| id.set_hi(_state->query_id().hi); |
| id.set_lo(_state->query_id().lo); |
| auto sink_buffer = std::make_unique<ExchangeSinkBuffer>(id, _dest_node_id, _node_id, state, |
| sender_ins_ids); |
| for (const auto& _dest : _dests) { |
| sink_buffer->construct_request(_dest.fragment_instance_id); |
| } |
| return sink_buffer; |
| } |
| |
| // For a normal shuffle scenario, if the concurrency is n, |
| // there can be up to n * n RPCs in the current fragment. |
| // Therefore, a shared sink buffer is used here to limit the number of concurrent RPCs. |
| // (Note: This does not reduce the total number of RPCs.) |
| // In a merge sort scenario, there are only n RPCs, so a shared sink buffer is not needed. |
| std::shared_ptr<ExchangeSinkBuffer> ExchangeSinkOperatorX::get_sink_buffer( |
| RuntimeState* state, InstanceLoId sender_ins_id) { |
| // When the child is SortSourceOperatorX or LocalExchangeSourceOperatorX, |
| // it is an order-by scenario. |
| // In this case, there is only one target instance, and no n * n RPC concurrency will occur. |
| // Therefore, sharing a sink buffer is not necessary. |
| if (_dest_is_merge) { |
| return _create_buffer(state, {sender_ins_id}); |
| } |
| if (_state->enable_shared_exchange_sink_buffer()) { |
| return _sink_buffer; |
| } |
| return _create_buffer(state, {sender_ins_id}); |
| } |
| } // namespace doris::pipeline |