blob: ad092656f217937a91fb75f3bef8b1117b036104 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/exchange/local_exchange_source_operator.h"
#include "exec/exchange/local_exchanger.h"
namespace doris {
Status LocalExchangeSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);
_channel_id = info.task_idx;
_shared_state->mem_counters[_channel_id] = _memory_used_counter;
_exchanger = _shared_state->exchanger.get();
DCHECK(_exchanger != nullptr);
_get_block_failed_counter =
ADD_COUNTER_WITH_LEVEL(custom_profile(), "GetBlockFailedTime", TUnit::UNIT, 1);
if (_exchanger->get_type() == ExchangeType::HASH_SHUFFLE ||
_exchanger->get_type() == ExchangeType::BUCKET_HASH_SHUFFLE) {
_copy_data_timer = ADD_TIMER(custom_profile(), "CopyDataTime");
}
return Status::OK();
}
Status LocalExchangeSourceLocalState::close(RuntimeState* state) {
if (_closed) {
return Status::OK();
}
for (size_t i = 0; i < _local_merge_deps.size(); i++) {
COUNTER_SET(_deps_counter[i], _local_merge_deps[i]->watcher_elapse_time());
}
if (_exchanger) {
_exchanger->close({_channel_id, this});
}
if (_shared_state) {
_shared_state->sub_running_source_operators();
}
std::vector<DependencySPtr> {}.swap(_local_merge_deps);
return Base::close(state);
}
std::vector<Dependency*> LocalExchangeSourceLocalState::dependencies() const {
if ((_exchanger->get_type() == ExchangeType::PASS_TO_ONE) && _channel_id != 0) {
// If this is a PASS_TO_ONE exchange and is not the first task, source operators always
// return empty result so no dependencies here.
return {};
} else {
return Base::dependencies();
}
}
std::string LocalExchangeSourceLocalState::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer,
"{}, _channel_id: {}, _num_partitions: {}, _num_senders: {}, _num_sources: {}, "
"_running_sink_operators: {}, _running_source_operators: {}, mem_usage: {}, "
"data queue info: {}",
Base::debug_string(indentation_level), _channel_id, _exchanger->_num_partitions,
_exchanger->_num_senders, _exchanger->_num_sources,
_exchanger->_running_sink_operators, _exchanger->_running_source_operators,
_shared_state->mem_usage.load(),
_exchanger->data_queue_debug_string(_channel_id));
size_t i = 0;
fmt::format_to(debug_string_buffer, ", MemTrackers: ");
for (auto* mem_counter : _shared_state->mem_counters) {
fmt::format_to(debug_string_buffer, "{}: {}, ", i, mem_counter->value());
i++;
}
return fmt::to_string(debug_string_buffer);
}
Status LocalExchangeSourceOperatorX::get_block(RuntimeState* state, Block* block, bool* eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
RETURN_IF_ERROR(local_state._exchanger->get_block(
state, block, eos, {nullptr, nullptr, local_state._copy_data_timer},
{local_state._channel_id, &local_state}));
local_state.reached_limit(block, eos);
return Status::OK();
}
} // namespace doris