blob: 38080010057d94c4a6003743474efcad47ab8d46 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "multi_cast_data_streamer.h"
#include <fmt/format.h>
#include <glog/logging.h>
#include <iterator>
#include <memory>
#include <vector>
#include "common/config.h"
#include "common/exception.h"
#include "common/logging.h"
#include "common/status.h"
#include "pipeline/dependency.h"
#include "pipeline/exec/multi_cast_data_stream_source.h"
#include "pipeline/exec/spill_utils.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "util/pretty_printer.h"
#include "util/uid_util.h"
#include "vec/core/block.h"
#include "vec/spill/spill_stream_manager.h"
namespace doris::pipeline {
#include "common/compile_check_begin.h"
MultiCastBlock::MultiCastBlock(vectorized::Block* block, int un_finish_copy, size_t mem_size)
: _un_finish_copy(un_finish_copy), _mem_size(mem_size) {
_block = vectorized::Block::create_unique(block->get_columns_with_type_and_name());
block->clear();
}
Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx, vectorized::Block* block,
bool* eos) {
MultiCastBlock* multi_cast_block = nullptr;
{
INJECT_MOCK_SLEEP(std::unique_lock l(_mutex));
for (auto it = _spill_readers[sender_idx].begin();
it != _spill_readers[sender_idx].end();) {
if ((*it)->all_data_read) {
it = _spill_readers[sender_idx].erase(it);
} else {
it++;
}
}
if (!_cached_blocks[sender_idx].empty()) {
*block = std::move(_cached_blocks[sender_idx].front());
_cached_blocks[sender_idx].erase(_cached_blocks[sender_idx].begin());
/** Eos:
* 1. `_eos` is true means no more data will be added into queue.
* 2. `_cached_blocks[sender_idx]` blocks recovered from spill.
* 3. `_spill_readers[sender_idx].empty()` means there are no blocks on disk.
* 4. `_sender_pos_to_read[sender_idx] == _multi_cast_blocks.end()` means no more blocks in queue.
*/
*eos = _eos && _cached_blocks[sender_idx].empty() &&
_spill_readers[sender_idx].empty() &&
_sender_pos_to_read[sender_idx] == _multi_cast_blocks.end();
return Status::OK();
}
if (!_spill_readers[sender_idx].empty()) {
auto reader_item = _spill_readers[sender_idx].front();
if (!reader_item->stream->ready_for_reading()) {
return Status::OK();
}
auto& reader = reader_item->reader;
RETURN_IF_ERROR(reader->open());
if (reader_item->block_offset != 0) {
reader->seek(reader_item->block_offset);
reader_item->block_offset = 0;
}
auto spill_func = [this, reader_item, sender_idx]() {
vectorized::Block block;
bool spill_eos = false;
size_t read_size = 0;
while (!spill_eos) {
RETURN_IF_ERROR(reader_item->reader->read(&block, &spill_eos));
if (!block.empty()) {
std::lock_guard l(_mutex);
read_size += block.allocated_bytes();
_cached_blocks[sender_idx].emplace_back(std::move(block));
if (_cached_blocks[sender_idx].size() >= 32 ||
read_size > 2 * 1024 * 1024) {
break;
}
}
}
if (spill_eos || !_cached_blocks[sender_idx].empty()) {
reader_item->all_data_read = spill_eos;
_set_ready_for_read(sender_idx);
}
return Status::OK();
};
auto catch_exception_func = [spill_func = std::move(spill_func)]() {
RETURN_IF_CATCH_EXCEPTION(return spill_func(););
};
l.unlock();
SpillRecoverRunnable spill_runnable(state, _source_operator_profiles[sender_idx],
catch_exception_func);
return spill_runnable.run();
}
auto& pos_to_pull = _sender_pos_to_read[sender_idx];
const auto end = _multi_cast_blocks.end();
if (pos_to_pull == end) {
_block_reading(sender_idx);
VLOG_DEBUG << "Query: " << print_id(state->query_id())
<< ", pos_to_pull end: " << (void*)(_write_dependency);
*eos = _eos;
return Status::OK();
}
DCHECK_GT(pos_to_pull->_un_finish_copy, 0);
DCHECK_LE(pos_to_pull->_un_finish_copy, _cast_sender_count);
*block = *pos_to_pull->_block;
multi_cast_block = &(*pos_to_pull);
_copying_count.fetch_add(1);
pos_to_pull++;
if (pos_to_pull == end) {
_block_reading(sender_idx);
*eos = _eos;
}
}
return _copy_block(state, sender_idx, block, *multi_cast_block);
}
Status MultiCastDataStreamer::_copy_block(RuntimeState* state, int32_t sender_idx,
vectorized::Block* block,
MultiCastBlock& multi_cast_block) {
const auto rows = block->rows();
for (int i = 0; i < block->columns(); ++i) {
block->get_by_position(i).column = block->get_by_position(i).column->clone_resized(rows);
}
INJECT_MOCK_SLEEP(std::lock_guard l(_mutex));
multi_cast_block._un_finish_copy--;
auto copying_count = _copying_count.fetch_sub(1) - 1;
if (multi_cast_block._un_finish_copy == 0) {
DCHECK_EQ(_multi_cast_blocks.front()._un_finish_copy, 0);
DCHECK_EQ(&(_multi_cast_blocks.front()), &multi_cast_block);
_multi_cast_blocks.pop_front();
_write_dependency->set_ready();
} else if (copying_count == 0) {
bool spilled = false;
RETURN_IF_ERROR(_trigger_spill_if_need(state, &spilled));
}
return Status::OK();
}
Status MultiCastDataStreamer::_trigger_spill_if_need(RuntimeState* state, bool* triggered) {
if (!state->enable_spill()) {
*triggered = false;
return Status::OK();
}
vectorized::SpillStreamSPtr spill_stream;
*triggered = false;
if (_cumulative_mem_size.load() >= config::exchg_node_buffer_size_bytes &&
_multi_cast_blocks.size() >= 4) {
_write_dependency->block();
if (_copying_count.load() != 0) {
return Status::OK();
}
bool has_reached_end = false;
std::vector<int64_t> distances(_cast_sender_count);
size_t total_count = _multi_cast_blocks.size();
for (int i = 0; i < _sender_pos_to_read.size(); ++i) {
distances[i] = std::distance(_multi_cast_blocks.begin(), _sender_pos_to_read[i]);
if (_sender_pos_to_read[i] == _multi_cast_blocks.end()) {
has_reached_end = true;
CHECK_EQ(distances[i], total_count);
}
if (!_spill_readers[i].empty()) {
CHECK_EQ(distances[i], 0);
}
}
if (has_reached_end) {
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
state, spill_stream, print_id(state->query_id()), "MultiCastSender", _node_id,
std::numeric_limits<int32_t>::max(), std::numeric_limits<size_t>::max(),
_sink_operator_profile));
for (int i = 0; i < _sender_pos_to_read.size(); ++i) {
if (distances[i] < total_count) {
auto reader = spill_stream->create_separate_reader();
reader->set_counters(_source_operator_profiles[i]);
auto reader_item = std::make_shared<SpillingReader>(
std::move(reader), spill_stream, distances[i], false);
_spill_readers[i].emplace_back(std::move(reader_item));
}
_block_reading(i);
}
RETURN_IF_ERROR(_start_spill_task(state, spill_stream));
DCHECK_EQ(_multi_cast_blocks.size(), 0);
for (auto& pos : _sender_pos_to_read) {
pos = _multi_cast_blocks.end();
}
_cumulative_mem_size = 0;
*triggered = true;
}
}
return Status::OK();
}
Status MultiCastDataStreamer::_start_spill_task(RuntimeState* state,
vectorized::SpillStreamSPtr spill_stream) {
std::vector<vectorized::Block> blocks;
for (auto& block : _multi_cast_blocks) {
DCHECK_GT(block._block->rows(), 0);
blocks.emplace_back(std::move(*block._block));
}
_multi_cast_blocks.clear();
auto spill_func = [state, blocks = std::move(blocks),
spill_stream = std::move(spill_stream)]() mutable {
const auto blocks_count = blocks.size();
while (!blocks.empty() && !state->is_cancelled()) {
auto block = std::move(blocks.front());
blocks.erase(blocks.begin());
RETURN_IF_ERROR(spill_stream->spill_block(state, block, false));
}
VLOG_DEBUG << "Query: " << print_id(state->query_id()) << " multi cast write "
<< blocks_count << " blocks";
return spill_stream->spill_eof();
};
auto exception_catch_func = [spill_func = std::move(spill_func),
query_id = print_id(state->query_id()), this]() mutable {
auto status = [&]() { RETURN_IF_CATCH_EXCEPTION(return spill_func()); }();
_write_dependency->set_ready();
if (!status.ok()) {
LOG(WARNING) << "Query: " << query_id
<< " multi cast write failed: " << status.to_string();
} else {
for (int i = 0; i < _sender_pos_to_read.size(); ++i) {
_set_ready_for_read(i);
}
}
return status;
};
return SpillSinkRunnable(state, nullptr, _sink_operator_profile, exception_catch_func).run();
}
Status MultiCastDataStreamer::push(RuntimeState* state, doris::vectorized::Block* block, bool eos) {
auto rows = block->rows();
COUNTER_UPDATE(_process_rows, rows);
const auto block_mem_size = block->allocated_bytes();
{
INJECT_MOCK_SLEEP(std::lock_guard l(_mutex));
if (_pending_block) {
DCHECK_GT(_pending_block->rows(), 0);
const auto pending_size = _pending_block->allocated_bytes();
_cumulative_mem_size += pending_size;
_multi_cast_blocks.emplace_back(_pending_block.get(), _cast_sender_count, pending_size);
_pending_block.reset();
auto last_elem = std::prev(_multi_cast_blocks.end());
for (int i = 0; i < _sender_pos_to_read.size(); ++i) {
if (_sender_pos_to_read[i] == _multi_cast_blocks.end()) {
_sender_pos_to_read[i] = last_elem;
_set_ready_for_read(i);
}
}
}
_cumulative_mem_size += block_mem_size;
COUNTER_SET(_peak_mem_usage,
std::max(_cumulative_mem_size.load(), _peak_mem_usage->value()));
if (rows > 0) {
if (!eos) {
bool spilled = false;
RETURN_IF_ERROR(_trigger_spill_if_need(state, &spilled));
if (spilled) {
_pending_block = vectorized::Block::create_unique(
block->get_columns_with_type_and_name());
block->clear();
return Status::OK();
}
}
_multi_cast_blocks.emplace_back(block, _cast_sender_count, block_mem_size);
// last elem
auto end = std::prev(_multi_cast_blocks.end());
for (int i = 0; i < _sender_pos_to_read.size(); ++i) {
if (_sender_pos_to_read[i] == _multi_cast_blocks.end()) {
_sender_pos_to_read[i] = end;
_set_ready_for_read(i);
}
}
} else if (eos) {
for (int i = 0; i < _sender_pos_to_read.size(); ++i) {
if (_sender_pos_to_read[i] == _multi_cast_blocks.end()) {
_set_ready_for_read(i);
}
}
}
_eos = eos;
}
if (_eos) {
for (auto* read_dep : _dependencies) {
read_dep->set_always_ready();
}
}
return Status::OK();
}
void MultiCastDataStreamer::_set_ready_for_read(int sender_idx) {
if (_dependencies.empty()) {
return;
}
auto* dep = _dependencies[sender_idx];
DCHECK(dep);
dep->set_ready();
}
void MultiCastDataStreamer::_block_reading(int sender_idx) {
if (_dependencies.empty()) {
return;
}
auto* dep = _dependencies[sender_idx];
DCHECK(dep);
dep->block();
}
std::string MultiCastDataStreamer::debug_string() {
size_t read_ready_count = 0;
size_t pos_at_end_count = 0;
size_t blocks_count = 0;
{
std::unique_lock l(_mutex);
blocks_count = _multi_cast_blocks.size();
for (int32_t i = 0; i != _cast_sender_count; ++i) {
if (!_dependencies[i]->is_blocked_by()) {
read_ready_count++;
}
if (_sender_pos_to_read[i] == _multi_cast_blocks.end()) {
pos_at_end_count++;
}
}
}
fmt::memory_buffer debug_string_buffer;
fmt::format_to(
debug_string_buffer,
"MemSize: {}, blocks: {}, sender count: {}, pos_at_end_count: {}, copying_count: {} "
"read_ready_count: {}",
PrettyPrinter::print_bytes(_cumulative_mem_size), blocks_count, _cast_sender_count,
pos_at_end_count, _copying_count.load(), read_ready_count);
return fmt::to_string(debug_string_buffer);
}
} // namespace doris::pipeline