| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #pragma once |
| |
| #include <gen_cpp/Types_types.h> |
| #include <gen_cpp/data.pb.h> |
| #include <glog/logging.h> |
| #include <google/protobuf/stubs/callback.h> |
| |
| #include <atomic> |
| #include <condition_variable> |
| #include <cstddef> |
| #include <cstdint> |
| #include <deque> |
| #include <list> |
| #include <memory> |
| #include <mutex> |
| #include <ostream> |
| #include <sstream> |
| #include <thread> |
| #include <unordered_map> |
| #include <unordered_set> |
| #include <utility> |
| #include <vector> |
| |
| #include "common/config.h" |
| #include "common/global_types.h" |
| #include "common/object_pool.h" |
| #include "common/status.h" |
| #include "runtime/descriptors.h" |
| #include "runtime/task_execution_context.h" |
| #include "runtime/thread_context.h" |
| #include "runtime/workload_group/workload_group.h" |
| #include "util/runtime_profile.h" |
| #include "util/stopwatch.hpp" |
| #include "vec/core/block.h" |
| #include "vec/exprs/vexpr_fwd.h" |
| |
| namespace doris { |
| #include "common/compile_check_begin.h" |
| class MemTracker; |
| class PBlock; |
| class MemTrackerLimiter; |
| class RuntimeState; |
| |
| namespace pipeline { |
| class Dependency; |
| class ExchangeLocalState; |
| } // namespace pipeline |
| |
| namespace vectorized { |
| class VDataStreamMgr; |
| class VSortedRunMerger; |
| |
| class VDataStreamRecvr; |
| |
| class VDataStreamRecvr : public HasTaskExecutionCtx { |
| public: |
| class SenderQueue; |
| VDataStreamRecvr(VDataStreamMgr* stream_mgr, RuntimeProfile::HighWaterMarkCounter* counter, |
| RuntimeState* state, const TUniqueId& fragment_instance_id, |
| PlanNodeId dest_node_id, int num_senders, bool is_merging, |
| RuntimeProfile* profile, size_t data_queue_capacity); |
| |
| ~VDataStreamRecvr() override; |
| |
| MOCK_FUNCTION Status create_merger(const VExprContextSPtrs& ordering_expr, |
| const std::vector<bool>& is_asc_order, |
| const std::vector<bool>& nulls_first, size_t batch_size, |
| int64_t limit, size_t offset); |
| |
| std::vector<SenderQueue*> sender_queues() const { return _sender_queues; } |
| |
| Status add_block(std::unique_ptr<PBlock> pblock, int sender_id, int be_number, |
| int64_t packet_seq, ::google::protobuf::Closure** done, |
| const int64_t wait_for_worker, const uint64_t time_to_find_recvr); |
| |
| Status add_blocks(const PTransmitDataParams* request, ::google::protobuf::Closure** done, |
| const int64_t wait_for_worker, const uint64_t time_to_find_recvr); |
| |
| void add_block(Block* block, int sender_id, bool use_move); |
| std::string debug_string(); |
| |
| MOCK_FUNCTION Status get_next(Block* block, bool* eos); |
| |
| const TUniqueId& fragment_instance_id() const { return _fragment_instance_id; } |
| PlanNodeId dest_node_id() const { return _dest_node_id; } |
| |
| // Indicate that a particular sender is done. Delegated to the appropriate |
| // sender queue. Called from DataStreamMgr. |
| void remove_sender(int sender_id, int be_number, Status exec_status); |
| |
| void cancel_stream(Status exec_status); |
| |
| MOCK_FUNCTION void close(); |
| |
| // When the source reaches eos = true |
| void set_sink_dep_always_ready() const; |
| |
| // Careful: stream sender will call this function for a local receiver, |
| // accessing members of receiver that are allocated by Object pool |
| // in this function is not safe. |
| MOCK_FUNCTION bool exceeds_limit(size_t block_byte_size); |
| bool queue_exceeds_limit(size_t byte_size) const; |
| bool is_closed() const { return _is_closed; } |
| |
| std::shared_ptr<pipeline::Dependency> get_local_channel_dependency(int sender_id); |
| |
| void set_low_memory_mode() { _sender_queue_mem_limit = 1012 * 1024; } |
| |
| private: |
| friend struct BlockSupplierSortCursorImpl; |
| |
| // DataStreamMgr instance used to create this recvr. (Not owned) |
| VDataStreamMgr* _mgr = nullptr; |
| |
| RuntimeProfile::HighWaterMarkCounter* _memory_used_counter = nullptr; |
| |
| std::shared_ptr<ResourceContext> _resource_ctx; |
| |
| std::shared_ptr<QueryContext> _query_context; |
| |
| // Fragment and node id of the destination exchange node this receiver is used by. |
| TUniqueId _fragment_instance_id; |
| PlanNodeId _dest_node_id; |
| |
| // Row schema, copied from the caller of CreateRecvr(). |
| RowDescriptor _row_desc; |
| |
| // True if this reciver merges incoming rows from different senders. Per-sender |
| // row batch queues are maintained in this case. |
| bool _is_merging; |
| bool _is_closed; |
| |
| std::unique_ptr<MemTracker> _mem_tracker; |
| // Managed by object pool |
| std::vector<SenderQueue*> _sender_queues; |
| |
| std::atomic<size_t> _sender_queue_mem_limit; |
| |
| std::unique_ptr<VSortedRunMerger> _merger; |
| |
| ObjectPool _sender_queue_pool; |
| RuntimeProfile* _profile = nullptr; |
| |
| RuntimeProfile::Counter* _remote_bytes_received_counter = nullptr; |
| RuntimeProfile::Counter* _local_bytes_received_counter = nullptr; |
| RuntimeProfile::Counter* _deserialize_row_batch_timer = nullptr; |
| RuntimeProfile::Counter* _first_batch_wait_total_timer = nullptr; |
| RuntimeProfile::Counter* _buffer_full_total_timer = nullptr; |
| RuntimeProfile::Counter* _data_arrival_timer = nullptr; |
| RuntimeProfile::Counter* _decompress_timer = nullptr; |
| RuntimeProfile::Counter* _decompress_bytes = nullptr; |
| |
| // Number of blocks received |
| RuntimeProfile::Counter* _blocks_produced_counter = nullptr; |
| RuntimeProfile::Counter* _max_wait_worker_time = nullptr; |
| RuntimeProfile::Counter* _max_wait_to_process_time = nullptr; |
| RuntimeProfile::Counter* _max_find_recvr_time = nullptr; |
| |
| std::vector<std::shared_ptr<pipeline::Dependency>> _sender_to_local_channel_dependency; |
| }; |
| |
| class VDataStreamRecvr::SenderQueue { |
| public: |
| SenderQueue(VDataStreamRecvr* parent_recvr, int num_senders, |
| std::shared_ptr<pipeline::Dependency> local_channel_dependency); |
| |
| ~SenderQueue(); |
| |
| Status get_batch(Block* next_block, bool* eos); |
| |
| Status add_block(std::unique_ptr<PBlock> pblock, int be_number, int64_t packet_seq, |
| ::google::protobuf::Closure** done, const int64_t wait_for_worker, |
| const uint64_t time_to_find_recvr); |
| |
| Status add_blocks(const PTransmitDataParams* request, ::google::protobuf::Closure** done, |
| const int64_t wait_for_worker, const uint64_t time_to_find_recvr); |
| |
| std::string debug_string(); |
| |
| void add_block(Block* block, bool use_move); |
| |
| void decrement_senders(int sender_id); |
| |
| void cancel(Status cancel_status); |
| |
| void close(); |
| |
| void set_dependency(std::shared_ptr<pipeline::Dependency> dependency) { |
| _source_dependency = dependency; |
| } |
| |
| protected: |
| void add_blocks_memory_usage(int64_t size); |
| |
| void sub_blocks_memory_usage(int64_t size); |
| |
| bool exceeds_limit(); |
| friend class pipeline::ExchangeLocalState; |
| |
| void set_source_ready(std::lock_guard<std::mutex>&); |
| |
| // Not managed by this class |
| VDataStreamRecvr* _recvr = nullptr; |
| std::mutex _lock; |
| bool _is_cancelled; |
| Status _cancel_status; |
| int _num_remaining_senders; |
| std::unique_ptr<MemTracker> _queue_mem_tracker; |
| |
| // `BlockItem` is used in `_block_queue` to handle both local and remote exchange blocks. |
| // For local exchange blocks, `BlockUPtr` is used directly without any modification. |
| // For remote exchange blocks, the `pblock` is stored in `BlockItem`. |
| // When `getBlock` is called, the `pblock` is deserialized into a usable block. |
| struct BlockItem { |
| Status get_block(BlockUPtr& block) { |
| if (!_block) { |
| DCHECK(_pblock); |
| SCOPED_RAW_TIMER(&_deserialize_time); |
| _block = Block::create_unique(); |
| RETURN_IF_ERROR_OR_CATCH_EXCEPTION( |
| _block->deserialize(*_pblock, &_decompress_bytes, &_decompress_time)); |
| } |
| block.swap(_block); |
| _block.reset(); |
| return Status::OK(); |
| } |
| |
| size_t block_byte_size() const { return _block_byte_size; } |
| int64_t deserialize_time() const { return _deserialize_time; } |
| int64_t decompress_time() const { return _decompress_time; } |
| size_t decompress_bytes() const { return _decompress_bytes; } |
| BlockItem() = default; |
| BlockItem(BlockUPtr&& block, size_t block_byte_size) |
| : _block(std::move(block)), _block_byte_size(block_byte_size) {} |
| |
| BlockItem(std::unique_ptr<PBlock>&& pblock, size_t block_byte_size) |
| : _block(nullptr), _pblock(std::move(pblock)), _block_byte_size(block_byte_size) {} |
| |
| void set_done(google::protobuf::Closure* done) { |
| // The done callback is only set when the queue memory limit is exceeded. |
| _done_cb = done; |
| _wait_timer.start(); |
| } |
| |
| void call_done(VDataStreamRecvr* recvr) { |
| if (_done_cb != nullptr) { |
| _done_cb->Run(); |
| _done_cb = nullptr; |
| _wait_timer.stop(); |
| int64_t elapse_time = _wait_timer.elapsed_time(); |
| if (recvr->_max_wait_to_process_time->value() < elapse_time) { |
| recvr->_max_wait_to_process_time->set(elapse_time); |
| } |
| recvr->_buffer_full_total_timer->update(elapse_time); |
| } |
| } |
| |
| private: |
| BlockUPtr _block; |
| std::unique_ptr<PBlock> _pblock; |
| size_t _block_byte_size = 0; |
| int64_t _deserialize_time = 0; |
| int64_t _decompress_time = 0; |
| size_t _decompress_bytes = 0; |
| |
| google::protobuf::Closure* _done_cb = nullptr; |
| MonotonicStopWatch _wait_timer; |
| }; |
| |
| std::list<BlockItem> _block_queue; |
| |
| // sender_id |
| std::unordered_set<int> _sender_eos_set; |
| // be_number => packet_seq |
| std::unordered_map<int, int64_t> _packet_seq_map; |
| |
| std::shared_ptr<pipeline::Dependency> _source_dependency; |
| std::shared_ptr<pipeline::Dependency> _local_channel_dependency; |
| }; |
| |
| } // namespace vectorized |
| } // namespace doris |
| |
| #include "common/compile_check_end.h" |