blob: dfc574591b3c57022c1bc43ffbaa2c512d243aac [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/runtime/vdata_stream_recvr.h"
#include <fmt/format.h>
#include <gen_cpp/Metrics_types.h>
#include <gen_cpp/Types_types.h>
#include <gen_cpp/data.pb.h>
#include <algorithm>
#include <functional>
#include <string>
#include "common/logging.h"
#include "pipeline/exec/exchange_sink_operator.h"
#include "pipeline/exec/exchange_source_operator.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "util/defer_op.h"
#include "util/uid_util.h"
#include "vec/core/block.h"
#include "vec/core/materialize_block.h"
#include "vec/core/sort_cursor.h"
#include "vec/runtime/vdata_stream_mgr.h"
#include "vec/runtime/vsorted_run_merger.h"
namespace doris::vectorized {
VDataStreamRecvr::SenderQueue::SenderQueue(VDataStreamRecvr* parent_recvr, int num_senders,
RuntimeProfile* profile)
: _recvr(parent_recvr),
_is_cancelled(false),
_num_remaining_senders(num_senders),
_received_first_batch(false) {
_cancel_status = Status::OK();
}
VDataStreamRecvr::SenderQueue::~SenderQueue() {
// Check pending closures, if it is not empty, should clear it here. but it should not happen.
// closure will delete itself during run method. If it is not called, brpc will memory leak.
DCHECK(_pending_closures.empty());
for (auto closure_pair : _pending_closures) {
closure_pair.first->Run();
}
_pending_closures.clear();
}
bool VDataStreamRecvr::SenderQueue::should_wait() {
std::unique_lock<std::mutex> l(_lock);
return !_is_cancelled && _block_queue.empty() && _num_remaining_senders > 0;
}
Status VDataStreamRecvr::SenderQueue::get_batch(Block* block, bool* eos) {
std::unique_lock<std::mutex> l(_lock);
// wait until something shows up or we know we're done
while (!_is_cancelled && _block_queue.empty() && _num_remaining_senders > 0) {
VLOG_ROW << "wait arrival fragment_instance_id=" << print_id(_recvr->fragment_instance_id())
<< " node=" << _recvr->dest_node_id();
// Don't count time spent waiting on the sender as active time.
CANCEL_SAFE_SCOPED_TIMER(_recvr->_data_arrival_timer, &_is_cancelled);
CANCEL_SAFE_SCOPED_TIMER(
_received_first_batch ? nullptr : _recvr->_first_batch_wait_total_timer,
&_is_cancelled);
_data_arrival_cv.wait(l);
}
return _inner_get_batch_without_lock(block, eos);
}
Status VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block, bool* eos) {
if (_is_cancelled) {
RETURN_IF_ERROR(_cancel_status);
return Status::Cancelled("Cancelled");
}
if (_block_queue.empty()) {
DCHECK_EQ(_num_remaining_senders, 0);
*eos = true;
return Status::OK();
}
_received_first_batch = true;
DCHECK(!_block_queue.empty());
auto [next_block, block_byte_size] = std::move(_block_queue.front());
_recvr->update_blocks_memory_usage(-block_byte_size);
_block_queue.pop_front();
_record_debug_info();
if (_block_queue.empty() && _dependency) {
if (!_is_cancelled && _num_remaining_senders > 0) {
_dependency->block();
}
if (_local_channel_dependency) {
_local_channel_dependency->set_ready();
}
}
if (!_pending_closures.empty()) {
auto closure_pair = _pending_closures.front();
closure_pair.first->Run();
_pending_closures.pop_front();
closure_pair.second.stop();
_recvr->_buffer_full_total_timer->update(closure_pair.second.elapsed_time());
}
block->swap(*next_block);
*eos = false;
return Status::OK();
}
void VDataStreamRecvr::SenderQueue::try_set_dep_ready_without_lock() {
if (!_dependency) {
return;
}
const bool should_wait = !_is_cancelled && _block_queue.empty() && _num_remaining_senders > 0;
if (!should_wait) {
_dependency->set_ready();
}
}
Status VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_number,
int64_t packet_seq,
::google::protobuf::Closure** done) {
{
std::lock_guard<std::mutex> l(_lock);
if (_is_cancelled) {
return Status::OK();
}
auto iter = _packet_seq_map.find(be_number);
if (iter != _packet_seq_map.end()) {
if (iter->second >= packet_seq) {
LOG(WARNING) << fmt::format(
"packet already exist [cur_packet_id= {} receive_packet_id={}]",
iter->second, packet_seq);
return Status::OK();
}
iter->second = packet_seq;
} else {
_packet_seq_map.emplace(be_number, packet_seq);
}
auto pblock_byte_size = pblock.ByteSizeLong();
COUNTER_UPDATE(_recvr->_bytes_received_counter, pblock_byte_size);
DCHECK(_num_remaining_senders >= 0);
if (_num_remaining_senders == 0) {
DCHECK(_sender_eos_set.end() != _sender_eos_set.find(be_number));
return Status::OK();
}
}
BlockUPtr block = nullptr;
int64_t deserialize_time = 0;
{
SCOPED_RAW_TIMER(&deserialize_time);
block = Block::create_unique();
RETURN_IF_ERROR(block->deserialize(pblock));
}
auto block_byte_size = block->allocated_bytes();
VLOG_ROW << "added #rows=" << block->rows() << " batch_size=" << block_byte_size << "\n";
std::lock_guard<std::mutex> l(_lock);
if (_is_cancelled) {
return Status::OK();
}
COUNTER_UPDATE(_recvr->_deserialize_row_batch_timer, deserialize_time);
COUNTER_UPDATE(_recvr->_decompress_timer, block->get_decompress_time());
COUNTER_UPDATE(_recvr->_decompress_bytes, block->get_decompressed_bytes());
COUNTER_UPDATE(_recvr->_rows_produced_counter, block->rows());
COUNTER_UPDATE(_recvr->_blocks_produced_counter, 1);
bool empty = !block->rows();
if (!empty) {
_block_queue.emplace_back(std::move(block), block_byte_size);
_record_debug_info();
try_set_dep_ready_without_lock();
}
// if done is nullptr, this function can't delay this response
if (done != nullptr && _recvr->exceeds_limit(block_byte_size)) {
MonotonicStopWatch monotonicStopWatch;
monotonicStopWatch.start();
DCHECK(*done != nullptr);
_pending_closures.emplace_back(*done, monotonicStopWatch);
*done = nullptr;
}
_recvr->update_blocks_memory_usage(block_byte_size);
if (!empty) {
_data_arrival_cv.notify_one();
}
return Status::OK();
}
void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) {
{
std::unique_lock<std::mutex> l(_lock);
if (_is_cancelled || !block->rows()) {
return;
}
}
auto block_bytes_received = block->bytes();
// Has to use unique ptr here, because clone column may failed if allocate memory failed.
BlockUPtr nblock = Block::create_unique(block->get_columns_with_type_and_name());
// local exchange should copy the block contented if use move == false
if (use_move) {
block->clear();
} else {
auto rows = block->rows();
for (int i = 0; i < nblock->columns(); ++i) {
nblock->get_by_position(i).column =
nblock->get_by_position(i).column->clone_resized(rows);
}
}
materialize_block_inplace(*nblock);
size_t block_mem_size = nblock->allocated_bytes();
std::unique_lock<std::mutex> l(_lock);
if (_is_cancelled) {
return;
}
COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_bytes_received);
COUNTER_UPDATE(_recvr->_rows_produced_counter, block->rows());
COUNTER_UPDATE(_recvr->_blocks_produced_counter, 1);
bool empty = !nblock->rows();
if (!empty) {
_block_queue.emplace_back(std::move(nblock), block_mem_size);
_record_debug_info();
try_set_dep_ready_without_lock();
_data_arrival_cv.notify_one();
}
// Careful: Accessing members of _recvr that are allocated by Object pool
// should be done before the following logic, because the _lock will be released
// by `iter->second->wait(l)`, after `iter->second->wait(l)` returns, _recvr may
// have been closed and resouces in _recvr are released;
_recvr->update_blocks_memory_usage(block_mem_size);
if (_recvr->exceeds_limit(0)) {
// yiguolei
// It is too tricky here, if the running thread is bthread then the tid may be wrong.
std::thread::id tid = std::this_thread::get_id();
MonotonicStopWatch monotonicStopWatch;
monotonicStopWatch.start();
auto iter = _local_closure.find(tid);
if (iter == _local_closure.end()) {
_local_closure.emplace(tid, new ThreadClosure);
iter = _local_closure.find(tid);
}
_pending_closures.emplace_back(iter->second.get(), monotonicStopWatch);
iter->second->wait(l);
}
}
void VDataStreamRecvr::SenderQueue::decrement_senders(int be_number) {
std::lock_guard<std::mutex> l(_lock);
if (_sender_eos_set.end() != _sender_eos_set.find(be_number)) {
return;
}
_sender_eos_set.insert(be_number);
DCHECK_GT(_num_remaining_senders, 0);
_num_remaining_senders--;
_record_debug_info();
VLOG_FILE << "decremented senders: fragment_instance_id="
<< print_id(_recvr->fragment_instance_id()) << " node_id=" << _recvr->dest_node_id()
<< " #senders=" << _num_remaining_senders;
if (_num_remaining_senders == 0) {
try_set_dep_ready_without_lock();
_data_arrival_cv.notify_one();
}
}
void VDataStreamRecvr::SenderQueue::cancel(Status cancel_status) {
{
std::lock_guard<std::mutex> l(_lock);
if (_is_cancelled) {
return;
}
_is_cancelled = true;
_cancel_status = cancel_status;
try_set_dep_ready_without_lock();
VLOG_QUERY << "cancelled stream: _fragment_instance_id="
<< print_id(_recvr->fragment_instance_id())
<< " node_id=" << _recvr->dest_node_id();
}
// Wake up all threads waiting to produce/consume batches. They will all
// notice that the stream is cancelled and handle it.
_data_arrival_cv.notify_all();
// _data_removal_cv.notify_all();
// PeriodicCounterUpdater::StopTimeSeriesCounter(
// _recvr->_bytes_received_time_series_counter);
{
std::lock_guard<std::mutex> l(_lock);
for (auto closure_pair : _pending_closures) {
closure_pair.first->Run();
}
_pending_closures.clear();
}
}
void VDataStreamRecvr::SenderQueue::close() {
{
// If _is_cancelled is not set to true, there may be concurrent send
// which add batch to _block_queue. The batch added after _block_queue
// is clear will be memory leak
std::lock_guard<std::mutex> l(_lock);
_is_cancelled = true;
try_set_dep_ready_without_lock();
for (auto closure_pair : _pending_closures) {
closure_pair.first->Run();
}
_pending_closures.clear();
}
// Delete any batches queued in _block_queue
_block_queue.clear();
}
VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr, RuntimeState* state,
const RowDescriptor& row_desc,
const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id,
int num_senders, bool is_merging, RuntimeProfile* profile)
: HasTaskExecutionCtx(state),
_mgr(stream_mgr),
#ifdef USE_MEM_TRACKER
_query_mem_tracker(state->query_mem_tracker()),
_query_id(state->query_id()),
#endif
_fragment_instance_id(fragment_instance_id),
_dest_node_id(dest_node_id),
_row_desc(row_desc),
_is_merging(is_merging),
_is_closed(false),
_profile(profile),
_peak_memory_usage_counter(nullptr),
_enable_pipeline(state->enable_pipeline_exec()),
_mem_available(std::make_shared<bool>(true)) {
// DataStreamRecvr may be destructed after the instance execution thread ends.
_mem_tracker =
std::make_unique<MemTracker>("VDataStreamRecvr:" + print_id(_fragment_instance_id));
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
// Create one queue per sender if is_merging is true.
int num_queues = is_merging ? num_senders : 1;
if (state->enable_pipeline_x_exec()) {
_sender_to_local_channel_dependency.resize(num_queues);
for (size_t i = 0; i < num_queues; i++) {
_sender_to_local_channel_dependency[i] =
pipeline::LocalExchangeChannelDependency::create_shared(
_dest_node_id, _dest_node_id, state->get_query_ctx());
}
}
_sender_queues.reserve(num_queues);
int num_sender_per_queue = is_merging ? 1 : num_senders;
for (int i = 0; i < num_queues; ++i) {
SenderQueue* queue = nullptr;
if (_enable_pipeline) {
queue = _sender_queue_pool.add(new PipSenderQueue(this, num_sender_per_queue, profile));
if (state->enable_pipeline_x_exec()) {
queue->set_local_channel_dependency(_sender_to_local_channel_dependency[i]);
}
} else {
queue = _sender_queue_pool.add(new SenderQueue(this, num_sender_per_queue, profile));
}
_sender_queues.push_back(queue);
}
// Initialize the counters
_memory_usage_counter = ADD_LABEL_COUNTER(_profile, "MemoryUsage");
_blocks_memory_usage = _profile->AddHighWaterMarkCounter("Blocks", TUnit::BYTES, "MemoryUsage");
_peak_memory_usage_counter =
_profile->AddHighWaterMarkCounter("PeakMemoryUsage", TUnit::BYTES, "MemoryUsage");
_bytes_received_counter = ADD_COUNTER(_profile, "BytesReceived", TUnit::BYTES);
_local_bytes_received_counter = ADD_COUNTER(_profile, "LocalBytesReceived", TUnit::BYTES);
_deserialize_row_batch_timer = ADD_TIMER(_profile, "DeserializeRowBatchTimer");
_data_arrival_timer = ADD_TIMER(_profile, "DataArrivalWaitTime");
_buffer_full_total_timer = ADD_TIMER(_profile, "SendersBlockedTotalTimer(*)");
_first_batch_wait_total_timer = ADD_TIMER(_profile, "FirstBatchArrivalWaitTime");
_decompress_timer = ADD_TIMER(_profile, "DecompressTime");
_decompress_bytes = ADD_COUNTER(_profile, "DecompressBytes", TUnit::BYTES);
_rows_produced_counter = ADD_COUNTER(_profile, "RowsProduced", TUnit::UNIT);
_blocks_produced_counter = ADD_COUNTER(_profile, "BlocksProduced", TUnit::UNIT);
}
VDataStreamRecvr::~VDataStreamRecvr() {
DCHECK(_mgr == nullptr) << "Must call close()";
}
Status VDataStreamRecvr::create_merger(const VExprContextSPtrs& ordering_expr,
const std::vector<bool>& is_asc_order,
const std::vector<bool>& nulls_first, size_t batch_size,
int64_t limit, size_t offset) {
DCHECK(_is_merging);
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
std::vector<BlockSupplier> child_block_suppliers;
// Create the merger that will a single stream of sorted rows.
_merger.reset(new VSortedRunMerger(ordering_expr, is_asc_order, nulls_first, batch_size, limit,
offset, _profile));
for (int i = 0; i < _sender_queues.size(); ++i) {
child_block_suppliers.emplace_back(std::bind(std::mem_fn(&SenderQueue::get_batch),
_sender_queues[i], std::placeholders::_1,
std::placeholders::_2));
}
_merger->set_pipeline_engine_enabled(_enable_pipeline);
RETURN_IF_ERROR(_merger->prepare(child_block_suppliers));
return Status::OK();
}
Status VDataStreamRecvr::add_block(const PBlock& pblock, int sender_id, int be_number,
int64_t packet_seq, ::google::protobuf::Closure** done) {
SCOPED_ATTACH_TASK_WITH_ID(_query_mem_tracker, _query_id, _fragment_instance_id);
int use_sender_id = _is_merging ? sender_id : 0;
return _sender_queues[use_sender_id]->add_block(pblock, be_number, packet_seq, done);
}
void VDataStreamRecvr::add_block(Block* block, int sender_id, bool use_move) {
_mem_tracker->consume(block->allocated_bytes());
int use_sender_id = _is_merging ? sender_id : 0;
_sender_queues[use_sender_id]->add_block(block, use_move);
}
bool VDataStreamRecvr::sender_queue_empty(int sender_id) {
int use_sender_id = _is_merging ? sender_id : 0;
return _sender_queues[use_sender_id]->queue_empty();
}
std::shared_ptr<pipeline::LocalExchangeChannelDependency>
VDataStreamRecvr::get_local_channel_dependency(int sender_id) {
DCHECK(_sender_to_local_channel_dependency[_is_merging ? sender_id : 0] != nullptr);
return _sender_to_local_channel_dependency[_is_merging ? sender_id : 0];
}
bool VDataStreamRecvr::ready_to_read() {
for (const auto& queue : _sender_queues) {
if (queue->should_wait()) {
return false;
}
}
return true;
}
Status VDataStreamRecvr::get_next(Block* block, bool* eos) {
_peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
Defer release_mem([&]() { _mem_tracker->release(block->allocated_bytes()); });
if (!_is_merging) {
block->clear();
return _sender_queues[0]->get_batch(block, eos);
} else {
return _merger->get_next(block, eos);
}
}
void VDataStreamRecvr::remove_sender(int sender_id, int be_number, Status exec_status) {
if (!exec_status.ok()) {
cancel_stream(exec_status);
return;
}
int use_sender_id = _is_merging ? sender_id : 0;
_sender_queues[use_sender_id]->decrement_senders(be_number);
}
void VDataStreamRecvr::cancel_stream(Status exec_status) {
VLOG_QUERY << "cancel_stream: fragment_instance_id=" << print_id(_fragment_instance_id)
<< exec_status;
for (int i = 0; i < _sender_queues.size(); ++i) {
_sender_queues[i]->cancel(exec_status);
}
}
void VDataStreamRecvr::update_blocks_memory_usage(int64_t size) {
_blocks_memory_usage->add(size);
auto val = _blocks_memory_usage_current_value.fetch_add(size);
if (val + size > config::exchg_node_buffer_size_bytes) {
*_mem_available = false;
} else {
*_mem_available = true;
}
}
void VDataStreamRecvr::close() {
if (_is_closed) {
return;
}
_is_closed = true;
for (auto& it : _sender_to_local_channel_dependency) {
it->set_ready();
}
for (int i = 0; i < _sender_queues.size(); ++i) {
_sender_queues[i]->close();
}
// Remove this receiver from the DataStreamMgr that created it.
// TODO: log error msg
static_cast<void>(_mgr->deregister_recvr(fragment_instance_id(), dest_node_id()));
_mgr = nullptr;
_merger.reset();
if (_peak_memory_usage_counter) {
_peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
}
}
void VDataStreamRecvr::PipSenderQueue::add_block(Block* block, bool use_move) {
if (block->rows() == 0) {
return;
}
{
std::unique_lock<std::mutex> l(_lock);
if (_is_cancelled) {
return;
}
}
BlockUPtr nblock = Block::create_unique(block->get_columns_with_type_and_name());
// local exchange should copy the block contented if use move == false
if (use_move) {
block->clear();
} else {
auto rows = block->rows();
for (int i = 0; i < nblock->columns(); ++i) {
nblock->get_by_position(i).column =
nblock->get_by_position(i).column->clone_resized(rows);
}
}
materialize_block_inplace(*nblock);
auto block_mem_size = nblock->allocated_bytes();
{
std::unique_lock<std::mutex> l(_lock);
if (_is_cancelled) {
return;
}
_block_queue.emplace_back(std::move(nblock), block_mem_size);
_record_debug_info();
try_set_dep_ready_without_lock();
COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_mem_size);
_recvr->update_blocks_memory_usage(block_mem_size);
_data_arrival_cv.notify_one();
}
}
} // namespace doris::vectorized