blob: b92aeadc8b1f3a7bb224d371ccebad1361a6d9e8 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <gen_cpp/Types_types.h>
#include <glog/logging.h>
#include <google/protobuf/stubs/callback.h>
#include <stddef.h>
#include <stdint.h>
#include <atomic>
#include <condition_variable>
#include <deque>
#include <list>
#include <memory>
#include <mutex>
#include <ostream>
#include <thread>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
#include "common/config.h"
#include "common/global_types.h"
#include "common/object_pool.h"
#include "common/status.h"
#include "runtime/descriptors.h"
#include "runtime/query_statistics.h"
#include "util/runtime_profile.h"
#include "util/stopwatch.hpp"
#include "vec/columns/column.h"
#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/core/materialize_block.h"
#include "vec/exprs/vexpr_fwd.h"
namespace doris {
class MemTracker;
class PBlock;
class MemTrackerLimiter;
class RuntimeState;
namespace vectorized {
class VDataStreamMgr;
class VSortedRunMerger;
class VDataStreamRecvr {
public:
VDataStreamRecvr(VDataStreamMgr* stream_mgr, RuntimeState* state, const RowDescriptor& row_desc,
const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id,
int num_senders, bool is_merging, RuntimeProfile* profile);
virtual ~VDataStreamRecvr();
Status create_merger(const VExprContextSPtrs& ordering_expr,
const std::vector<bool>& is_asc_order,
const std::vector<bool>& nulls_first, size_t batch_size, int64_t limit,
size_t offset);
Status add_block(const PBlock& pblock, int sender_id, int be_number, int64_t packet_seq,
::google::protobuf::Closure** done);
void add_block(Block* block, int sender_id, bool use_move);
bool sender_queue_empty(int sender_id);
bool ready_to_read();
Status get_next(Block* block, bool* eos);
const TUniqueId& fragment_instance_id() const { return _fragment_instance_id; }
PlanNodeId dest_node_id() const { return _dest_node_id; }
const RowDescriptor& row_desc() const { return _row_desc; }
// Indicate that a particular sender is done. Delegated to the appropriate
// sender queue. Called from DataStreamMgr.
void remove_sender(int sender_id, int be_number);
// We need msg to make sure we can pass existing regression test.
void cancel_stream(const std::string& msg = "");
void close();
// Careful: stream sender will call this function for a local receiver,
// accessing members of receiver that are allocated by Object pool
// in this function is not safe.
bool exceeds_limit(int batch_size) {
return _blocks_memory_usage_current_value + batch_size >
config::exchg_node_buffer_size_bytes;
}
bool is_closed() const { return _is_closed; }
private:
void update_blocks_memory_usage(int64_t size) {
_blocks_memory_usage->add(size);
_blocks_memory_usage_current_value = _blocks_memory_usage->current_value();
}
class SenderQueue;
class PipSenderQueue;
friend struct BlockSupplierSortCursorImpl;
// DataStreamMgr instance used to create this recvr. (Not owned)
VDataStreamMgr* _mgr;
#ifdef USE_MEM_TRACKER
std::shared_ptr<MemTrackerLimiter> _query_mem_tracker;
TUniqueId _query_id;
#endif
// Fragment and node id of the destination exchange node this receiver is used by.
TUniqueId _fragment_instance_id;
PlanNodeId _dest_node_id;
// Row schema, copied from the caller of CreateRecvr().
RowDescriptor _row_desc;
// True if this reciver merges incoming rows from different senders. Per-sender
// row batch queues are maintained in this case.
bool _is_merging;
bool _is_closed;
std::unique_ptr<MemTracker> _mem_tracker;
// Managed by object pool
std::vector<SenderQueue*> _sender_queues;
std::unique_ptr<VSortedRunMerger> _merger;
ObjectPool _sender_queue_pool;
RuntimeProfile* _profile;
RuntimeProfile::Counter* _bytes_received_counter;
RuntimeProfile::Counter* _local_bytes_received_counter;
RuntimeProfile::Counter* _deserialize_row_batch_timer;
RuntimeProfile::Counter* _first_batch_wait_total_timer;
RuntimeProfile::Counter* _buffer_full_total_timer;
RuntimeProfile::Counter* _data_arrival_timer;
RuntimeProfile::Counter* _decompress_timer;
RuntimeProfile::Counter* _decompress_bytes;
RuntimeProfile::Counter* _memory_usage_counter;
RuntimeProfile::HighWaterMarkCounter* _blocks_memory_usage;
std::atomic<int64_t> _blocks_memory_usage_current_value = 0;
RuntimeProfile::Counter* _peak_memory_usage_counter;
// Number of rows received
RuntimeProfile::Counter* _rows_produced_counter;
// Number of blocks received
RuntimeProfile::Counter* _blocks_produced_counter;
bool _enable_pipeline;
};
class ThreadClosure : public google::protobuf::Closure {
public:
void Run() override { _cv.notify_one(); }
void wait(std::unique_lock<std::mutex>& lock) { _cv.wait(lock); }
private:
std::condition_variable _cv;
};
class VDataStreamRecvr::SenderQueue {
public:
SenderQueue(VDataStreamRecvr* parent_recvr, int num_senders, RuntimeProfile* profile);
virtual ~SenderQueue();
virtual bool should_wait();
virtual Status get_batch(Block* next_block, bool* eos);
Status add_block(const PBlock& pblock, int be_number, int64_t packet_seq,
::google::protobuf::Closure** done);
virtual void add_block(Block* block, bool use_move);
void decrement_senders(int sender_id);
void cancel(const std::string& msg = "");
void close();
bool queue_empty() {
std::unique_lock<std::mutex> l(_lock);
return _block_queue.empty();
}
protected:
Status _inner_get_batch_without_lock(Block* block, bool* eos);
// Not managed by this class
VDataStreamRecvr* _recvr;
std::mutex _lock;
bool _is_cancelled;
std::string _cancel_msg = "";
int _num_remaining_senders;
std::condition_variable _data_arrival_cv;
std::condition_variable _data_removal_cv;
std::list<std::pair<BlockUPtr, size_t>> _block_queue;
bool _received_first_batch;
// sender_id
std::unordered_set<int> _sender_eos_set;
// be_number => packet_seq
std::unordered_map<int, int64_t> _packet_seq_map;
std::deque<std::pair<google::protobuf::Closure*, MonotonicStopWatch>> _pending_closures;
std::unordered_map<std::thread::id, std::unique_ptr<ThreadClosure>> _local_closure;
};
class VDataStreamRecvr::PipSenderQueue : public SenderQueue {
public:
PipSenderQueue(VDataStreamRecvr* parent_recvr, int num_senders, RuntimeProfile* profile)
: SenderQueue(parent_recvr, num_senders, profile) {}
Status get_batch(Block* block, bool* eos) override {
std::lock_guard<std::mutex> l(_lock); // protect _block_queue
DCHECK(_is_cancelled || !_block_queue.empty() || _num_remaining_senders == 0)
<< " _is_cancelled: " << _is_cancelled
<< ", _block_queue_empty: " << _block_queue.empty()
<< ", _num_remaining_senders: " << _num_remaining_senders;
return _inner_get_batch_without_lock(block, eos);
}
void add_block(Block* block, bool use_move) override {
if (block->rows() == 0) return;
{
std::unique_lock<std::mutex> l(_lock);
if (_is_cancelled) {
return;
}
}
BlockUPtr nblock = Block::create_unique(block->get_columns_with_type_and_name());
// local exchange should copy the block contented if use move == false
if (use_move) {
block->clear();
} else {
auto rows = block->rows();
for (int i = 0; i < nblock->columns(); ++i) {
nblock->get_by_position(i).column =
nblock->get_by_position(i).column->clone_resized(rows);
}
}
materialize_block_inplace(*nblock);
auto block_mem_size = nblock->allocated_bytes();
{
std::unique_lock<std::mutex> l(_lock);
if (_is_cancelled) {
return;
}
_block_queue.emplace_back(std::move(nblock), block_mem_size);
COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_mem_size);
_recvr->update_blocks_memory_usage(block_mem_size);
_data_arrival_cv.notify_one();
}
}
};
} // namespace vectorized
} // namespace doris