| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #pragma once |
| |
| #include <atomic> |
| #include <memory> |
| #include <string> |
| #include <vector> |
| |
| #include "core/block/block.h" |
| #include "exec/exchange/vdata_stream_sender.h" |
| #include "exec/pipeline/dependency.h" |
| #include "exec/spill/spill_file.h" |
| #include "runtime/runtime_profile.h" |
| |
| namespace doris { |
| #include "common/compile_check_begin.h" |
| |
| class Dependency; |
| struct MultiCastSharedState; |
| |
| struct MultiCastBlock { |
| MultiCastBlock(Block* block, int need_copy, size_t mem_size); |
| |
| std::unique_ptr<Block> _block; |
| // Each block is copied during pull. If _un_finish_copy == 0, |
| // it indicates that this block has been fully used and can be released. |
| int _un_finish_copy; |
| size_t _mem_size; |
| }; |
| |
| struct SpillingReader { |
| SpillFileReaderSPtr reader; |
| SpillFileSPtr spill_file; |
| |
| int64_t block_offset {0}; |
| bool all_data_read {false}; |
| }; |
| |
| // TDOD: MultiCastDataStreamer same as the data queue, maybe rethink union and refactor the |
| // code |
| class MultiCastDataStreamer { |
| public: |
| MultiCastDataStreamer(ObjectPool* pool, int cast_sender_count, int32_t node_id) |
| : _profile(pool->add(new RuntimeProfile("MultiCastDataStreamSink"))), |
| _cached_blocks(cast_sender_count), |
| _cast_sender_count(cast_sender_count), |
| _node_id(node_id), |
| _spill_readers(cast_sender_count), |
| _source_operator_profiles(cast_sender_count) { |
| _sender_pos_to_read.resize(cast_sender_count, _multi_cast_blocks.end()); |
| _dependencies.resize(cast_sender_count, nullptr); |
| |
| _peak_mem_usage = ADD_COUNTER(profile(), "PeakMemUsage", TUnit::BYTES); |
| _process_rows = ADD_COUNTER(profile(), "ProcessRows", TUnit::UNIT); |
| }; |
| |
| ~MultiCastDataStreamer() = default; |
| |
| Status pull(RuntimeState* state, int sender_idx, Block* block, bool* eos); |
| |
| Status push(RuntimeState* state, Block* block, bool eos); |
| |
| RuntimeProfile* profile() { return _profile; } |
| |
| void set_dep_by_sender_idx(int sender_idx, Dependency* dep) { |
| _dependencies[sender_idx] = dep; |
| _block_reading(sender_idx); |
| } |
| |
| void set_write_dependency(Dependency* dependency) { _write_dependency = dependency; } |
| |
| void set_sink_profile(RuntimeProfile* profile) { _sink_operator_profile = profile; } |
| |
| void set_source_profile(int sender_idx, RuntimeProfile* profile) { |
| _source_operator_profiles[sender_idx] = profile; |
| } |
| |
| std::string debug_string(); |
| |
| private: |
| void _set_ready_for_read(int sender_idx); |
| void _block_reading(int sender_idx); |
| |
| Status _copy_block(RuntimeState* state, int32_t sender_idx, Block* block, |
| MultiCastBlock& multi_cast_block); |
| |
| Status _start_spill_task(RuntimeState* state, SpillFileSPtr spill_file); |
| |
| Status _trigger_spill_if_need(RuntimeState* state, bool* triggered); |
| |
| RuntimeProfile* _profile = nullptr; |
| std::list<MultiCastBlock> _multi_cast_blocks; |
| std::vector<std::vector<Block>> _cached_blocks; |
| std::vector<std::list<MultiCastBlock>::iterator> _sender_pos_to_read; |
| std::mutex _mutex; |
| bool _eos = false; |
| int _cast_sender_count = 0; |
| int _node_id; |
| std::atomic_int64_t _cumulative_mem_size = 0; |
| std::atomic_int64_t _copying_count = 0; |
| RuntimeProfile::Counter* _process_rows = nullptr; |
| RuntimeProfile::Counter* _peak_mem_usage = nullptr; |
| |
| Dependency* _write_dependency; |
| std::vector<Dependency*> _dependencies; |
| |
| BlockUPtr _pending_block; |
| |
| std::vector<std::vector<std::shared_ptr<SpillingReader>>> _spill_readers; |
| |
| RuntimeProfile* _sink_operator_profile; |
| // operator_profile of each source operator |
| std::vector<RuntimeProfile*> _source_operator_profiles; |
| }; |
| #include "common/compile_check_end.h" |
| } // namespace doris |