| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "exec/exchange/local_exchange_sink_operator.h" |
| |
| #include <memory> |
| |
| #include "exec/exchange/local_exchanger.h" |
| #include "exec/exchange/vdata_stream_sender.h" |
| #include "exec/partitioner/partitioner.h" |
| |
| namespace doris { |
| |
| LocalExchangeSinkLocalState::~LocalExchangeSinkLocalState() = default; |
| |
| std::vector<Dependency*> LocalExchangeSinkLocalState::dependencies() const { |
| auto deps = Base::dependencies(); |
| |
| auto* dep = _shared_state->get_sink_dep_by_channel_id(_channel_id); |
| if (dep != nullptr) { |
| deps.push_back(dep); |
| } |
| return deps; |
| } |
| |
| Status LocalExchangeSinkOperatorX::init(RuntimeState* state, ExchangeType type, |
| const int num_buckets, const bool use_global_hash_shuffle, |
| const std::map<int, int>& shuffle_idx_to_instance_idx) { |
| _name = "LOCAL_EXCHANGE_SINK_OPERATOR(" + get_exchange_type_name(type) + ")"; |
| _type = type; |
| if (_type == ExchangeType::HASH_SHUFFLE) { |
| _shuffle_idx_to_instance_idx.clear(); |
| _use_global_shuffle = use_global_hash_shuffle; |
| // For shuffle join, if data distribution has been broken by previous operator, we |
| // should use a HASH_SHUFFLE local exchanger to shuffle data again. To be mentioned, |
| // we should use map shuffle idx to instance idx because all instances will be |
| // distributed to all BEs. Otherwise, we should use shuffle idx directly. |
| if (use_global_hash_shuffle) { |
| _shuffle_idx_to_instance_idx = shuffle_idx_to_instance_idx; |
| } else { |
| for (int i = 0; i < _num_partitions; i++) { |
| _shuffle_idx_to_instance_idx[i] = i; |
| } |
| } |
| if (state->query_options().__isset.enable_new_shuffle_hash_method && |
| state->query_options().enable_new_shuffle_hash_method) { |
| _partitioner = std::make_unique<Crc32CHashPartitioner>(_num_partitions); |
| } else { |
| _partitioner = |
| std::make_unique<Crc32HashPartitioner<ShuffleChannelIds>>(_num_partitions); |
| } |
| RETURN_IF_ERROR(_partitioner->init(_texprs)); |
| } else if (_type == ExchangeType::BUCKET_HASH_SHUFFLE) { |
| DCHECK_GT(num_buckets, 0); |
| _partitioner = std::make_unique<Crc32HashPartitioner<ShuffleChannelIds>>(num_buckets); |
| RETURN_IF_ERROR(_partitioner->init(_texprs)); |
| } |
| return Status::OK(); |
| } |
| |
| Status LocalExchangeSinkOperatorX::prepare(RuntimeState* state) { |
| RETURN_IF_ERROR(DataSinkOperatorX<LocalExchangeSinkLocalState>::prepare(state)); |
| if (_type == ExchangeType::HASH_SHUFFLE || _type == ExchangeType::BUCKET_HASH_SHUFFLE) { |
| RETURN_IF_ERROR(_partitioner->prepare(state, _child->row_desc())); |
| RETURN_IF_ERROR(_partitioner->open(state)); |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status LocalExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { |
| RETURN_IF_ERROR(Base::init(state, info)); |
| SCOPED_TIMER(exec_time_counter()); |
| SCOPED_TIMER(_init_timer); |
| _compute_hash_value_timer = ADD_TIMER(custom_profile(), "ComputeHashValueTime"); |
| _distribute_timer = ADD_TIMER(custom_profile(), "DistributeDataTime"); |
| if (_parent->cast<LocalExchangeSinkOperatorX>()._type == ExchangeType::HASH_SHUFFLE) { |
| custom_profile()->add_info_string( |
| "UseGlobalShuffle", |
| std::to_string(_parent->cast<LocalExchangeSinkOperatorX>()._use_global_shuffle)); |
| } |
| custom_profile()->add_info_string( |
| "PartitionExprsSize", |
| std::to_string(_parent->cast<LocalExchangeSinkOperatorX>()._partitioned_exprs_num)); |
| _channel_id = info.task_idx; |
| return Status::OK(); |
| } |
| |
| Status LocalExchangeSinkLocalState::open(RuntimeState* state) { |
| SCOPED_TIMER(exec_time_counter()); |
| SCOPED_TIMER(_open_timer); |
| RETURN_IF_ERROR(Base::open(state)); |
| _exchanger = _shared_state->exchanger.get(); |
| DCHECK(_exchanger != nullptr); |
| |
| if (_exchanger->get_type() == ExchangeType::HASH_SHUFFLE || |
| _exchanger->get_type() == ExchangeType::BUCKET_HASH_SHUFFLE) { |
| auto& p = _parent->cast<LocalExchangeSinkOperatorX>(); |
| RETURN_IF_ERROR(p._partitioner->clone(state, _partitioner)); |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status LocalExchangeSinkLocalState::close(RuntimeState* state, Status exec_status) { |
| SCOPED_TIMER(Base::exec_time_counter()); |
| SCOPED_TIMER(Base::_close_timer); |
| if (Base::_closed) { |
| return Status::OK(); |
| } |
| if (_shared_state) { |
| _shared_state->sub_running_sink_operators(); |
| } |
| return Base::close(state, exec_status); |
| } |
| |
| std::string LocalExchangeSinkLocalState::debug_string(int indentation_level) const { |
| fmt::memory_buffer debug_string_buffer; |
| fmt::format_to(debug_string_buffer, |
| "{}, _use_global_shuffle: {}, _channel_id: {}, _num_partitions: {}, " |
| "_num_senders: {}, _num_sources: {}, " |
| "_running_sink_operators: {}, _running_source_operators: {}", |
| Base::debug_string(indentation_level), |
| _parent->cast<LocalExchangeSinkOperatorX>()._use_global_shuffle, _channel_id, |
| _exchanger->_num_partitions, _exchanger->_num_senders, _exchanger->_num_sources, |
| _exchanger->_running_sink_operators, _exchanger->_running_source_operators); |
| return fmt::to_string(debug_string_buffer); |
| } |
| |
| Status LocalExchangeSinkOperatorX::sink(RuntimeState* state, Block* in_block, bool eos) { |
| auto& local_state = get_local_state(state); |
| SCOPED_TIMER(local_state.exec_time_counter()); |
| COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); |
| |
| if (state->low_memory_mode()) { |
| set_low_memory_mode(state); |
| } |
| SinkInfo sink_info = {.channel_id = &local_state._channel_id, |
| .partitioner = local_state._partitioner.get(), |
| .local_state = &local_state, |
| .shuffle_idx_to_instance_idx = &_shuffle_idx_to_instance_idx}; |
| RETURN_IF_ERROR(local_state._exchanger->sink( |
| state, in_block, eos, |
| {local_state._compute_hash_value_timer, local_state._distribute_timer, nullptr}, |
| sink_info)); |
| |
| // If all exchange sources ended due to limit reached, current task should also finish |
| if (local_state._exchanger->_running_source_operators == 0) { |
| return Status::EndOfFile("receiver eof"); |
| } |
| |
| return Status::OK(); |
| } |
| } // namespace doris |