blob: f0c5476fd800295d8a121c9a41a425f2c9660c53 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <stdint.h>
#include <algorithm>
#include <atomic>
#include <memory>
#include <mutex>
#include "common/status.h"
#include "exec/exchange/exchange_writer.h"
#include "exec/exchange/vdata_stream_sender.h"
#include "exec/operator/exchange_sink_buffer.h"
#include "exec/operator/operator.h"
namespace doris {
#include "common/compile_check_begin.h"
class RuntimeState;
class TDataSink;
class ExchangeSinkLocalState MOCK_REMOVE(final) : public PipelineXSinkLocalState<> {
ENABLE_FACTORY_CREATOR(ExchangeSinkLocalState);
using Base = PipelineXSinkLocalState<>;
public:
ExchangeSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
: Base(parent, state), _serializer(this) {
_finish_dependency =
std::make_shared<Dependency>(parent->operator_id(), parent->node_id(),
parent->get_name() + "_FINISH_DEPENDENCY", false);
}
#ifdef BE_TEST
ExchangeSinkLocalState(RuntimeState* state) : Base(nullptr, state) {
_operator_profile = state->obj_pool()->add(new RuntimeProfile("OperatorProfile"));
_custom_profile = state->obj_pool()->add(new RuntimeProfile("CustomCounters"));
_common_profile = state->obj_pool()->add(new RuntimeProfile("CommonCounters"));
_operator_profile->add_child(_custom_profile, true);
_operator_profile->add_child(_common_profile, true);
_memory_used_counter =
_common_profile->AddHighWaterMarkCounter("MemoryUsage", TUnit::BYTES, "", 1);
}
#endif
std::vector<Dependency*> dependencies() const override {
std::vector<Dependency*> dep_vec;
if (_queue_dependency) {
dep_vec.push_back(_queue_dependency.get());
}
std::ranges::for_each(_local_channels_dependency, [&](std::shared_ptr<Dependency> dep) {
dep_vec.push_back(dep.get());
});
return dep_vec;
}
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status open(RuntimeState* state) override;
Status close(RuntimeState* state, Status exec_status) override;
Dependency* finishdependency() override { return _finish_dependency.get(); }
void register_channels(ExchangeSinkBuffer* buffer);
RuntimeProfile::Counter* blocks_sent_counter() { return _blocks_sent_counter; }
RuntimeProfile::Counter* local_send_timer() { return _local_send_timer; }
RuntimeProfile::Counter* local_bytes_send_counter() { return _local_bytes_send_counter; }
RuntimeProfile::Counter* local_sent_rows() { return _local_sent_rows; }
RuntimeProfile::Counter* merge_block_timer() { return _merge_block_timer; }
[[nodiscard]] bool transfer_large_data_by_brpc() const;
bool is_finished() const override { return _reach_limit.load(); }
void set_reach_limit() { _reach_limit = true; };
// sender_id indicates which instance within a fragment, while be_number indicates which instance
// across all fragments. For example, with 3 BEs and 8 instances, the range of sender_id would be 0 to 24,
// and the range of be_number would be from n + 0 to n + 24.
// Since be_number is a required field, it still needs to be set for compatibility with older code.
[[nodiscard]] int sender_id() const { return _sender_id; }
[[nodiscard]] int be_number() const { return _state->be_number(); }
std::string name_suffix() override;
segment_v2::CompressionTypePB compression_type() const;
std::string debug_string(int indentation_level) const override;
RuntimeProfile::Counter* send_new_partition_timer() { return _send_new_partition_timer; }
RuntimeProfile::Counter* add_partition_request_timer() { return _add_partition_request_timer; }
RuntimeProfile::Counter* split_block_hash_compute_timer() {
return _split_block_hash_compute_timer;
}
RuntimeProfile::Counter* distribute_rows_into_channels_timer() {
return _distribute_rows_into_channels_timer;
}
std::vector<std::shared_ptr<Channel>> channels;
std::vector<int> local_channel_ids;
int current_channel_idx {0}; // index of current channel to send to if _random == true
bool _only_local_exchange {false};
void on_channel_finished(InstanceLoId channel_id);
PartitionerBase* partitioner() const { return _partitioner.get(); }
private:
friend class ExchangeSinkOperatorX;
friend class Channel;
friend class BlockSerializer;
MOCK_FUNCTION void _create_channels();
std::shared_ptr<ExchangeSinkBuffer> _sink_buffer = nullptr;
RuntimeProfile::Counter* _serialize_batch_timer = nullptr;
RuntimeProfile::Counter* _compress_timer = nullptr;
RuntimeProfile::Counter* _bytes_sent_counter = nullptr;
RuntimeProfile::Counter* _uncompressed_bytes_counter = nullptr;
RuntimeProfile::Counter* _local_sent_rows = nullptr;
RuntimeProfile::Counter* _local_send_timer = nullptr;
RuntimeProfile::Counter* _split_block_hash_compute_timer = nullptr;
RuntimeProfile::Counter* _distribute_rows_into_channels_timer = nullptr;
RuntimeProfile::Counter* _blocks_sent_counter = nullptr;
// Throughput per total time spent in sender
RuntimeProfile::Counter* _overall_throughput = nullptr;
// Used to counter send bytes under local data exchange
RuntimeProfile::Counter* _local_bytes_send_counter = nullptr;
RuntimeProfile::Counter* _merge_block_timer = nullptr;
RuntimeProfile::Counter* _send_new_partition_timer = nullptr;
RuntimeProfile::Counter* _wait_queue_timer = nullptr;
RuntimeProfile::Counter* _wait_broadcast_buffer_timer = nullptr;
std::vector<RuntimeProfile::Counter*> _wait_channel_timer;
// Sender instance id, unique within a fragment.
int _sender_id;
std::shared_ptr<BroadcastPBlockHolderMemLimiter> _broadcast_pb_mem_limiter;
size_t _rpc_channels_num = 0;
BlockSerializer _serializer;
std::shared_ptr<Dependency> _queue_dependency = nullptr;
/**
* We use this to control the execution for local exchange.
* +---------------+ +---------------+ +---------------+
* | ExchangeSink1 | | ExchangeSink2 | | ExchangeSink3 |
* +---------------+ +---------------+ +---------------+
* | | |
* | +----------------------------+----------------------------------+ |
* +----+------------------|------------------------------------------+ | |
* | | +------------------------|--------------------|------------+-----+
* Dependency 1-1 | Dependency 2-1 | Dependency 3-1 | Dependency 1-2 | Dependency 2-2 | Dependency 3-2 |
* +----------------------------------------------+ +----------------------------------------------+
* | queue1 queue2 queue3 | | queue1 queue2 queue3 |
* | LocalRecvr | | LocalRecvr |
* +----------------------------------------------+ +----------------------------------------------+
* +-----------------+ +------------------+
* | ExchangeSource1 | | ExchangeSource2 |
* +-----------------+ +------------------+
*/
std::vector<std::shared_ptr<Dependency>> _local_channels_dependency;
std::unique_ptr<PartitionerBase> _partitioner;
std::unique_ptr<ExchangeWriterBase> _writer;
size_t _partition_count;
std::shared_ptr<Dependency> _finish_dependency;
// for shuffle data by partition and tablet
RuntimeProfile::Counter* _add_partition_request_timer = nullptr;
TPartitionType::type _part_type;
std::atomic<bool> _reach_limit = false;
int _last_local_channel_idx = -1;
std::atomic_int _working_channels_count = 0;
std::set<InstanceLoId> _finished_channels;
std::mutex _finished_channels_mutex;
};
class ExchangeSinkOperatorX MOCK_REMOVE(final) : public DataSinkOperatorX<ExchangeSinkLocalState> {
public:
ExchangeSinkOperatorX(RuntimeState* state, const RowDescriptor& row_desc, int operator_id,
const TDataStreamSink& sink,
const std::vector<TPlanFragmentDestination>& destinations,
const std::vector<TUniqueId>& fragment_instance_ids);
Status init(const TDataSink& tsink) override;
// The state is from pipeline fragment context, it will be saved in ExchangeSinkOperator
// and it will be passed to exchange sink buffer. So that exchange sink buffer should not
// be used after pipeline fragment ctx. All operations in Exchange Sink Buffer should hold
// TaskExecutionContext.
Status prepare(RuntimeState* state) override;
Status sink(RuntimeState* state, Block* in_block, bool eos) override;
bool is_serial_operator() const override { return true; }
void set_low_memory_mode(RuntimeState* state) override {
auto& local_state = get_local_state(state);
// When `local_state._only_local_exchange` the `sink_buffer` is nullptr.
if (local_state._sink_buffer) {
local_state._sink_buffer->set_low_memory_mode();
}
if (local_state._broadcast_pb_mem_limiter) {
local_state._broadcast_pb_mem_limiter->set_low_memory_mode();
}
local_state._serializer.set_low_memory_mode(state);
for (auto& channel : local_state.channels) {
channel->set_low_memory_mode(state);
}
}
// For a normal shuffle scenario, if the concurrency is n,
// there can be up to n * n RPCs in the current fragment.
// Therefore, a shared sink buffer is used here to limit the number of concurrent RPCs.
// (Note: This does not reduce the total number of RPCs.)
// In a merge sort scenario, there are only n RPCs, so a shared sink buffer is not needed.
std::shared_ptr<ExchangeSinkBuffer> get_sink_buffer(RuntimeState* state,
InstanceLoId sender_ins_id);
VExprContextSPtrs& tablet_sink_expr_ctxs() { return _tablet_sink_expr_ctxs; }
private:
friend class ExchangeSinkLocalState;
MOCK_FUNCTION void _init_sink_buffer();
template <typename ChannelPtrType>
Status _handle_eof_channel(RuntimeState* state, ChannelPtrType channel, Status st);
// Use ExchangeSinkOperatorX to create a sink buffer.
// The sink buffer can be shared among multiple ExchangeSinkLocalState instances,
// or each ExchangeSinkLocalState can have its own sink buffer.
std::shared_ptr<ExchangeSinkBuffer> _create_buffer(
RuntimeState* state, const std::vector<InstanceLoId>& sender_ins_ids);
std::shared_ptr<ExchangeSinkBuffer> _sink_buffer = nullptr;
RuntimeState* _state = nullptr;
const std::vector<TExpr> _texprs;
TMergePartitionInfo _merge_partition_info;
bool _has_merge_partition_info = false;
const RowDescriptor& _row_desc;
TTupleId _output_tuple_id = -1;
TPartitionType::type _part_type;
// serialized batches for broadcasting; we need two so we can write
// one while the other one is still being sent
PBlock _pb_block1;
PBlock _pb_block2;
const std::vector<TPlanFragmentDestination> _dests;
// Identifier of the destination plan node.
const PlanNodeId _dest_node_id;
// User can change this config at runtime, avoid it being modified during query or loading process.
bool _transfer_large_data_by_brpc = false;
segment_v2::CompressionTypePB _compression_type;
// for tablet sink shuffle
const TOlapTableSchemaParam _tablet_sink_schema;
const TOlapTablePartitionParam _tablet_sink_partition;
const TOlapTableLocationParam _tablet_sink_location;
const TTupleId _tablet_sink_tuple_id;
int64_t _tablet_sink_txn_id = -1;
std::shared_ptr<ObjectPool> _pool;
VExprContextSPtrs _tablet_sink_expr_ctxs;
const std::vector<TExpr>* _t_tablet_sink_exprs = nullptr;
// for external table sink random partition
// Control the number of channels according to the flow, thereby controlling the number of table sink writers.
size_t _data_processed = 0;
int _writer_count = 1;
// If dest_is_merge is true, it indicates that the corresponding receiver is a VMERGING-EXCHANGE.
// The receiver will sort the collected data, so the sender must ensure that the data sent is ordered.
const bool _dest_is_merge;
const std::vector<TUniqueId>& _fragment_instance_ids;
};
#include "common/compile_check_end.h"
} // namespace doris