| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "exec/sink/writer/vtablet_writer.h" |
| |
| #include <brpc/http_method.h> |
| #include <bthread/bthread.h> |
| #include <fmt/format.h> |
| #include <gen_cpp/DataSinks_types.h> |
| #include <gen_cpp/Descriptors_types.h> |
| #include <gen_cpp/Exprs_types.h> |
| #include <gen_cpp/FrontendService.h> |
| #include <gen_cpp/FrontendService_types.h> |
| #include <gen_cpp/HeartbeatService_types.h> |
| #include <gen_cpp/Metrics_types.h> |
| #include <gen_cpp/Types_types.h> |
| #include <gen_cpp/data.pb.h> |
| #include <gen_cpp/internal_service.pb.h> |
| #include <glog/logging.h> |
| #include <google/protobuf/stubs/common.h> |
| #include <sys/param.h> |
| |
| #include <algorithm> |
| #include <initializer_list> |
| #include <memory> |
| #include <mutex> |
| #include <sstream> |
| #include <string> |
| #include <unordered_map> |
| #include <utility> |
| #include <vector> |
| |
| #include "cloud/config.h" |
| #include "common/config.h" |
| #include "core/data_type/data_type.h" |
| #include "cpp/sync_point.h" |
| #include "exec/sink/vrow_distribution.h" |
| #include "exprs/vexpr_fwd.h" |
| #include "runtime/runtime_profile.h" |
| |
| #ifdef DEBUG |
| #include <unordered_set> |
| #endif |
| |
| #include "common/compiler_util.h" // IWYU pragma: keep |
| #include "common/logging.h" |
| #include "common/metrics/doris_metrics.h" |
| #include "common/object_pool.h" |
| #include "common/signal_handler.h" |
| #include "common/status.h" |
| #include "core/block/block.h" |
| #include "core/column/column.h" |
| #include "core/column/column_const.h" |
| #include "core/data_type/data_type_nullable.h" |
| #include "exec/sink/vtablet_block_convertor.h" |
| #include "exec/sink/vtablet_finder.h" |
| #include "exprs/vexpr.h" |
| #include "runtime/descriptors.h" |
| #include "runtime/exec_env.h" |
| #include "runtime/memory/memory_reclamation.h" |
| #include "runtime/query_context.h" |
| #include "runtime/runtime_state.h" |
| #include "runtime/thread_context.h" |
| #include "service/backend_options.h" |
| #include "storage/tablet_info.h" |
| #include "util/brpc_closure.h" |
| #include "util/debug_points.h" |
| #include "util/defer_op.h" |
| #include "util/mem_info.h" |
| #include "util/network_util.h" |
| #include "util/proto_util.h" |
| #include "util/threadpool.h" |
| #include "util/thrift_rpc_helper.h" |
| #include "util/thrift_util.h" |
| #include "util/time.h" |
| #include "util/uid_util.h" |
| |
| namespace doris { |
| class TExpr; |
| |
| #include "common/compile_check_begin.h" |
| |
| bvar::Adder<int64_t> g_sink_write_bytes; |
| bvar::PerSecond<bvar::Adder<int64_t>> g_sink_write_bytes_per_second("sink_throughput_byte", |
| &g_sink_write_bytes, 60); |
| bvar::Adder<int64_t> g_sink_write_rows; |
| bvar::PerSecond<bvar::Adder<int64_t>> g_sink_write_rows_per_second("sink_throughput_row", |
| &g_sink_write_rows, 60); |
| bvar::Adder<int64_t> g_sink_load_back_pressure_version_time_ms( |
| "load_back_pressure_version_time_ms"); |
| |
| Status IndexChannel::init(RuntimeState* state, const std::vector<TTabletWithPartition>& tablets, |
| bool incremental) { |
| SCOPED_CONSUME_MEM_TRACKER(_index_channel_tracker.get()); |
| for (const auto& tablet : tablets) { |
| // First find the location BEs of this tablet |
| auto* tablet_locations = _parent->_location->find_tablet(tablet.tablet_id); |
| if (tablet_locations == nullptr) { |
| return Status::InternalError("unknown tablet, tablet_id={}", tablet.tablet_id); |
| } |
| std::vector<std::shared_ptr<VNodeChannel>> channels; |
| // For tablet, deal with its' all replica (in some node). |
| for (auto& replica_node_id : tablet_locations->node_ids) { |
| std::shared_ptr<VNodeChannel> channel; |
| auto it = _node_channels.find(replica_node_id); |
| // when we prepare for TableSink or incremental open tablet, we need init |
| if (it == _node_channels.end()) { |
| // NodeChannel is not added to the _parent->_pool. |
| // Because the deconstruction of NodeChannel may take a long time to wait rpc finish. |
| // but the ObjectPool will hold a spin lock to delete objects. |
| channel = |
| std::make_shared<VNodeChannel>(_parent, this, replica_node_id, incremental); |
| _node_channels.emplace(replica_node_id, channel); |
| // incremental opened new node. when close we have use two-stage close. |
| if (incremental) { |
| _has_inc_node = true; |
| } |
| VLOG_CRITICAL << "init new node for instance " << _parent->_sender_id |
| << ", node id:" << replica_node_id << ", incremantal:" << incremental; |
| } else { |
| channel = it->second; |
| } |
| channel->add_tablet(tablet); |
| if (_parent->_write_single_replica) { |
| auto* slave_location = _parent->_slave_location->find_tablet(tablet.tablet_id); |
| if (slave_location != nullptr) { |
| channel->add_slave_tablet_nodes(tablet.tablet_id, slave_location->node_ids); |
| } |
| } |
| channels.push_back(channel); |
| _tablets_by_channel[replica_node_id].insert(tablet.tablet_id); |
| } |
| _channels_by_tablet.emplace(tablet.tablet_id, std::move(channels)); |
| } |
| for (auto& it : _node_channels) { |
| RETURN_IF_ERROR(it.second->init(state)); |
| } |
| if (_where_clause != nullptr) { |
| RETURN_IF_ERROR(_where_clause->prepare(state, *_parent->_output_row_desc)); |
| RETURN_IF_ERROR(_where_clause->open(state)); |
| } |
| |
| return Status::OK(); |
| } |
| |
| void IndexChannel::mark_as_failed(const VNodeChannel* node_channel, const std::string& err, |
| int64_t tablet_id) { |
| DCHECK(node_channel != nullptr); |
| LOG(INFO) << "mark node_id:" << node_channel->channel_info() << " tablet_id: " << tablet_id |
| << " as failed, err: " << err; |
| auto node_id = node_channel->node_id(); |
| const auto& it = _tablets_by_channel.find(node_id); |
| if (it == _tablets_by_channel.end()) { |
| return; |
| } |
| |
| { |
| std::lock_guard<std::mutex> l(_fail_lock); |
| if (tablet_id == -1) { |
| for (const auto the_tablet_id : it->second) { |
| _failed_channels[the_tablet_id].insert(node_id); |
| _failed_channels_msgs.emplace(the_tablet_id, |
| err + ", host: " + node_channel->host()); |
| if (_failed_channels[the_tablet_id].size() > _max_failed_replicas(the_tablet_id)) { |
| _intolerable_failure_status = Status::Error<ErrorCode::INTERNAL_ERROR, false>( |
| _failed_channels_msgs[the_tablet_id]); |
| } |
| } |
| } else { |
| _failed_channels[tablet_id].insert(node_id); |
| _failed_channels_msgs.emplace(tablet_id, err + ", host: " + node_channel->host()); |
| if (_failed_channels[tablet_id].size() > _max_failed_replicas(tablet_id)) { |
| _intolerable_failure_status = Status::Error<ErrorCode::INTERNAL_ERROR, false>( |
| _failed_channels_msgs[tablet_id]); |
| } |
| } |
| } |
| } |
| |
| int IndexChannel::_max_failed_replicas(int64_t tablet_id) { |
| auto [total_replicas_num, load_required_replicas_num] = |
| _parent->_tablet_replica_info[tablet_id]; |
| int max_failed_replicas = total_replicas_num == 0 |
| ? (_parent->_num_replicas - 1) / 2 |
| : total_replicas_num - load_required_replicas_num; |
| return max_failed_replicas; |
| } |
| |
| int IndexChannel::_load_required_replicas_num(int64_t tablet_id) { |
| auto [total_replicas_num, load_required_replicas_num] = |
| _parent->_tablet_replica_info[tablet_id]; |
| if (total_replicas_num == 0) { |
| return (_parent->_num_replicas + 1) / 2; |
| } |
| return load_required_replicas_num; |
| } |
| |
| Status IndexChannel::check_intolerable_failure() { |
| std::lock_guard<std::mutex> l(_fail_lock); |
| return _intolerable_failure_status; |
| } |
| |
| void IndexChannel::set_error_tablet_in_state(RuntimeState* state) { |
| std::vector<TErrorTabletInfo> error_tablet_infos; |
| |
| { |
| std::lock_guard<std::mutex> l(_fail_lock); |
| for (const auto& it : _failed_channels_msgs) { |
| TErrorTabletInfo error_info; |
| error_info.__set_tabletId(it.first); |
| error_info.__set_msg(it.second); |
| error_tablet_infos.emplace_back(error_info); |
| } |
| } |
| state->add_error_tablet_infos(error_tablet_infos); |
| } |
| |
| void IndexChannel::set_tablets_received_rows( |
| const std::vector<std::pair<int64_t, int64_t>>& tablets_received_rows, int64_t node_id) { |
| for (const auto& [tablet_id, rows_num] : tablets_received_rows) { |
| _tablets_received_rows[tablet_id].emplace_back(node_id, rows_num); |
| } |
| } |
| |
| void IndexChannel::set_tablets_filtered_rows( |
| const std::vector<std::pair<int64_t, int64_t>>& tablets_filtered_rows, int64_t node_id) { |
| for (const auto& [tablet_id, rows_num] : tablets_filtered_rows) { |
| _tablets_filtered_rows[tablet_id].emplace_back(node_id, rows_num); |
| } |
| } |
| |
| Status IndexChannel::check_tablet_received_rows_consistency() { |
| for (auto& tablet : _tablets_received_rows) { |
| for (size_t i = 0; i < tablet.second.size(); i++) { |
| VLOG_NOTICE << "check_tablet_received_rows_consistency, load_id: " << _parent->_load_id |
| << ", txn_id: " << std::to_string(_parent->_txn_id) |
| << ", tablet_id: " << tablet.first |
| << ", node_id: " << tablet.second[i].first |
| << ", rows_num: " << tablet.second[i].second; |
| if (i == 0) { |
| continue; |
| } |
| if (tablet.second[i].second != tablet.second[0].second) { |
| return Status::InternalError( |
| "rows num written by multi replicas doest't match, load_id={}, txn_id={}, " |
| "tablt_id={}, node_id={}, rows_num={}, node_id={}, rows_num={}", |
| print_id(_parent->_load_id), _parent->_txn_id, tablet.first, |
| tablet.second[i].first, tablet.second[i].second, tablet.second[0].first, |
| tablet.second[0].second); |
| } |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status IndexChannel::check_tablet_filtered_rows_consistency() { |
| for (auto& tablet : _tablets_filtered_rows) { |
| for (size_t i = 0; i < tablet.second.size(); i++) { |
| VLOG_NOTICE << "check_tablet_filtered_rows_consistency, load_id: " << _parent->_load_id |
| << ", txn_id: " << std::to_string(_parent->_txn_id) |
| << ", tablet_id: " << tablet.first |
| << ", node_id: " << tablet.second[i].first |
| << ", rows_num: " << tablet.second[i].second; |
| if (i == 0) { |
| continue; |
| } |
| if (tablet.second[i].second != tablet.second[0].second) { |
| return Status::InternalError( |
| "rows num filtered by multi replicas doest't match, load_id={}, txn_id={}, " |
| "tablt_id={}, node_id={}, rows_num={}, node_id={}, rows_num={}", |
| print_id(_parent->_load_id), _parent->_txn_id, tablet.first, |
| tablet.second[i].first, tablet.second[i].second, tablet.second[0].first, |
| tablet.second[0].second); |
| } |
| } |
| } |
| return Status::OK(); |
| } |
| |
| static Status cancel_channel_and_check_intolerable_failure(Status status, |
| const std::string& err_msg, |
| IndexChannel& ich, VNodeChannel& nch) { |
| LOG(WARNING) << nch.channel_info() << ", close channel failed, err: " << err_msg; |
| ich.mark_as_failed(&nch, err_msg, -1); |
| // cancel the node channel in best effort |
| nch.cancel(err_msg); |
| |
| // check if index has intolerable failure |
| if (Status index_st = ich.check_intolerable_failure(); !index_st.ok()) { |
| status = std::move(index_st); |
| } else if (Status receive_st = ich.check_tablet_received_rows_consistency(); !receive_st.ok()) { |
| status = std::move(receive_st); |
| } else if (Status filter_st = ich.check_tablet_filtered_rows_consistency(); !filter_st.ok()) { |
| status = std::move(filter_st); |
| } |
| return status; |
| } |
| |
| Status IndexChannel::close_wait( |
| RuntimeState* state, WriterStats* writer_stats, |
| std::unordered_map<int64_t, AddBatchCounter>* node_add_batch_counter_map, |
| std::unordered_set<int64_t> unfinished_node_channel_ids, |
| bool need_wait_after_quorum_success) { |
| DBUG_EXECUTE_IF("IndexChannel.close_wait.timeout", |
| { return Status::TimedOut("injected timeout"); }); |
| Status status = Status::OK(); |
| // 1. wait quorum success |
| std::unordered_set<int64_t> need_finish_tablets; |
| auto partition_ids = _parent->_tablet_finder->partition_ids(); |
| for (const auto& part : _parent->_vpartition->get_partitions()) { |
| if (partition_ids.contains(part->id)) { |
| for (const auto& index : part->indexes) { |
| for (const auto& tablet_id : index.tablets) { |
| need_finish_tablets.insert(tablet_id); |
| } |
| } |
| } |
| } |
| while (true) { |
| RETURN_IF_ERROR(check_each_node_channel_close( |
| &unfinished_node_channel_ids, node_add_batch_counter_map, writer_stats, status)); |
| bool quorum_success = _quorum_success(unfinished_node_channel_ids, need_finish_tablets); |
| if (unfinished_node_channel_ids.empty() || quorum_success) { |
| LOG(INFO) << "quorum_success: " << quorum_success |
| << ", is all finished: " << unfinished_node_channel_ids.empty() |
| << ", txn_id: " << _parent->_txn_id |
| << ", load_id: " << print_id(_parent->_load_id); |
| break; |
| } |
| bthread_usleep(1000 * 10); |
| } |
| |
| // 2. wait for all node channel to complete as much as possible |
| if (!unfinished_node_channel_ids.empty() && need_wait_after_quorum_success) { |
| int64_t arrival_quorum_success_time = UnixMillis(); |
| int64_t max_wait_time_ms = _calc_max_wait_time_ms(unfinished_node_channel_ids); |
| while (true) { |
| RETURN_IF_ERROR(check_each_node_channel_close(&unfinished_node_channel_ids, |
| node_add_batch_counter_map, writer_stats, |
| status)); |
| if (unfinished_node_channel_ids.empty()) { |
| break; |
| } |
| int64_t elapsed_ms = UnixMillis() - arrival_quorum_success_time; |
| if (elapsed_ms > max_wait_time_ms || |
| _parent->_load_channel_timeout_s - elapsed_ms / 1000 < |
| config::quorum_success_remaining_timeout_seconds) { |
| // cancel unfinished node channel |
| std::stringstream unfinished_node_channel_host_str; |
| for (auto& it : unfinished_node_channel_ids) { |
| unfinished_node_channel_host_str << _node_channels[it]->host() << ","; |
| _node_channels[it]->cancel("timeout"); |
| } |
| LOG(WARNING) << "reach max wait time, max_wait_time_ms: " << max_wait_time_ms |
| << ", cancel unfinished node channel and finish close" |
| << ", load id: " << print_id(_parent->_load_id) |
| << ", txn_id: " << _parent->_txn_id << ", unfinished node channel: " |
| << unfinished_node_channel_host_str.str(); |
| break; |
| } |
| bthread_usleep(1000 * 10); |
| } |
| } |
| return status; |
| } |
| |
| Status IndexChannel::check_each_node_channel_close( |
| std::unordered_set<int64_t>* unfinished_node_channel_ids, |
| std::unordered_map<int64_t, AddBatchCounter>* node_add_batch_counter_map, |
| WriterStats* writer_stats, Status status) { |
| Status final_status = Status::OK(); |
| for (auto& it : _node_channels) { |
| std::shared_ptr<VNodeChannel> node_channel = it.second; |
| // If the node channel is not in the unfinished_node_channel_ids, |
| // it means the node channel is already closed. |
| if (!unfinished_node_channel_ids->contains(it.first)) { |
| continue; |
| } |
| bool node_channel_closed = false; |
| auto close_status = it.second->close_wait(_parent->_state, &node_channel_closed); |
| if (node_channel_closed) { |
| close_status = it.second->after_close_handle(_parent->_state, writer_stats, |
| node_add_batch_counter_map); |
| unfinished_node_channel_ids->erase(it.first); |
| } |
| DBUG_EXECUTE_IF("IndexChannel.check_each_node_channel_close.close_status_not_ok", |
| { close_status = Status::InternalError("injected close status not ok"); }); |
| if (!close_status.ok()) { |
| final_status = cancel_channel_and_check_intolerable_failure( |
| std::move(final_status), close_status.to_string(), *this, *it.second); |
| } |
| } |
| |
| return final_status; |
| } |
| |
| bool IndexChannel::_quorum_success(const std::unordered_set<int64_t>& unfinished_node_channel_ids, |
| const std::unordered_set<int64_t>& need_finish_tablets) { |
| if (!config::enable_quorum_success_write) { |
| return false; |
| } |
| if (need_finish_tablets.empty()) [[unlikely]] { |
| return false; |
| } |
| |
| // 1. collect all write tablets and finished tablets |
| std::unordered_map<int64_t, int64_t> finished_tablets_replica; |
| for (const auto& [node_id, node_channel] : _node_channels) { |
| if (unfinished_node_channel_ids.contains(node_id) || !node_channel->check_status().ok()) { |
| continue; |
| } |
| for (const auto& tablet_id : _tablets_by_channel[node_id]) { |
| // Only count non-gap backends for quorum success. |
| // Gap backends' success doesn't count toward majority write. |
| auto gap_it = _parent->_tablet_version_gap_backends.find(tablet_id); |
| if (gap_it == _parent->_tablet_version_gap_backends.end() || |
| gap_it->second.find(node_id) == gap_it->second.end()) { |
| finished_tablets_replica[tablet_id]++; |
| } |
| } |
| } |
| |
| // 2. check if quorum success |
| for (const auto& tablet_id : need_finish_tablets) { |
| if (finished_tablets_replica[tablet_id] < _load_required_replicas_num(tablet_id)) { |
| return false; |
| } |
| } |
| |
| return true; |
| } |
| |
| int64_t IndexChannel::_calc_max_wait_time_ms( |
| const std::unordered_set<int64_t>& unfinished_node_channel_ids) { |
| // 1. calculate avg speed of all unfinished node channel |
| int64_t elapsed_ms = UnixMillis() - _start_time; |
| int64_t total_bytes = 0; |
| int finished_count = 0; |
| for (const auto& [node_id, node_channel] : _node_channels) { |
| if (unfinished_node_channel_ids.contains(node_id)) { |
| continue; |
| } |
| total_bytes += node_channel->write_bytes(); |
| finished_count++; |
| } |
| // no data loaded in index channel, return 0 |
| if (total_bytes == 0 || finished_count == 0) { |
| return 0; |
| } |
| // if elapsed_ms is equal to 0, explain the loaded data is too small |
| if (elapsed_ms <= 0) { |
| return config::quorum_success_min_wait_seconds * 1000; |
| } |
| double avg_speed = |
| static_cast<double>(total_bytes) / (static_cast<double>(elapsed_ms) * finished_count); |
| |
| // 2. calculate max wait time of each unfinished node channel and return the max value |
| int64_t max_wait_time_ms = 0; |
| for (int64_t id : unfinished_node_channel_ids) { |
| int64_t bytes = _node_channels[id]->write_bytes(); |
| int64_t wait = |
| avg_speed > 0 ? static_cast<int64_t>(static_cast<double>(bytes) / avg_speed) : 0; |
| max_wait_time_ms = std::max(max_wait_time_ms, wait); |
| } |
| |
| // 3. calculate max wait time |
| // introduce quorum_success_min_wait_seconds to avoid jitter of small load |
| max_wait_time_ms -= UnixMillis() - _start_time; |
| max_wait_time_ms = |
| std::max(static_cast<int64_t>(static_cast<double>(max_wait_time_ms) * |
| (1.0 + config::quorum_success_max_wait_multiplier)), |
| config::quorum_success_min_wait_seconds * 1000); |
| |
| return max_wait_time_ms; |
| } |
| |
| static Status none_of(std::initializer_list<bool> vars) { |
| bool none = std::none_of(vars.begin(), vars.end(), [](bool var) { return var; }); |
| Status st = Status::OK(); |
| if (!none) { |
| std::string vars_str; |
| std::for_each(vars.begin(), vars.end(), |
| [&vars_str](bool var) -> void { vars_str += (var ? "1/" : "0/"); }); |
| if (!vars_str.empty()) { |
| vars_str.pop_back(); // 0/1/0/ -> 0/1/0 |
| } |
| st = Status::Uninitialized(vars_str); |
| } |
| |
| return st; |
| } |
| |
| VNodeChannel::VNodeChannel(VTabletWriter* parent, IndexChannel* index_channel, int64_t node_id, |
| bool is_incremental) |
| : _parent(parent), |
| _index_channel(index_channel), |
| _node_id(node_id), |
| _is_incremental(is_incremental) { |
| _cur_add_block_request = std::make_shared<PTabletWriterAddBlockRequest>(); |
| _node_channel_tracker = std::make_shared<MemTracker>( |
| fmt::format("NodeChannel:indexID={}:threadId={}", |
| std::to_string(_index_channel->_index_id), ThreadContext::get_thread_id())); |
| _load_mem_limit = MemInfo::mem_limit() * config::load_process_max_memory_limit_percent / 100; |
| } |
| |
| VNodeChannel::~VNodeChannel() = default; |
| |
| void VNodeChannel::clear_all_blocks() { |
| std::lock_guard<std::mutex> lg(_pending_batches_lock); |
| std::queue<AddBlockReq> empty; |
| std::swap(_pending_blocks, empty); |
| _cur_mutable_block.reset(); |
| } |
| |
| // we don't need to send tablet_writer_cancel rpc request when |
| // init failed, so set _is_closed to true. |
| // if "_cancelled" is set to true, |
| // no need to set _cancel_msg because the error will be |
| // returned directly via "TabletSink::prepare()" method. |
| Status VNodeChannel::init(RuntimeState* state) { |
| if (_inited) { |
| return Status::OK(); |
| } |
| |
| SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get()); |
| _task_exec_ctx = state->get_task_execution_context(); |
| _tuple_desc = _parent->_output_tuple_desc; |
| _state = state; |
| // get corresponding BE node. |
| const auto* node = _parent->_nodes_info->find_node(_node_id); |
| if (node == nullptr) { |
| _cancelled = true; |
| _is_closed = true; |
| return Status::InternalError("unknown node id, id={}", _node_id); |
| } |
| _node_info = *node; |
| |
| _load_info = "load_id=" + print_id(_parent->_load_id) + |
| ", txn_id=" + std::to_string(_parent->_txn_id); |
| |
| _row_desc = std::make_unique<RowDescriptor>(_tuple_desc); |
| _batch_size = state->batch_size(); |
| |
| _stub = state->exec_env()->brpc_internal_client_cache()->get_client(_node_info.host, |
| _node_info.brpc_port); |
| if (_stub == nullptr) { |
| _cancelled = true; |
| _is_closed = true; |
| return Status::InternalError("Get rpc stub failed, host={}, port={}, info={}", |
| _node_info.host, _node_info.brpc_port, channel_info()); |
| } |
| |
| _rpc_timeout_ms = state->execution_timeout() * 1000; |
| _timeout_watch.start(); |
| |
| // Initialize _cur_add_block_request |
| if (!_cur_add_block_request->has_id()) { |
| *(_cur_add_block_request->mutable_id()) = _parent->_load_id; |
| } |
| _cur_add_block_request->set_index_id(_index_channel->_index_id); |
| _cur_add_block_request->set_sender_id(_parent->_sender_id); |
| _cur_add_block_request->set_backend_id(_node_id); |
| _cur_add_block_request->set_eos(false); |
| |
| // add block closure |
| // Has to using value to capture _task_exec_ctx because tablet writer may destroyed during callback. |
| _send_block_callback = WriteBlockCallback<PTabletWriterAddBlockResult>::create_shared(); |
| _send_block_callback->addFailedHandler( |
| [&, task_exec_ctx = _task_exec_ctx](const WriteBlockCallbackContext& ctx) { |
| std::shared_ptr<TaskExecutionContext> ctx_lock = task_exec_ctx.lock(); |
| if (ctx_lock == nullptr) { |
| return; |
| } |
| _add_block_failed_callback(ctx); |
| }); |
| |
| _send_block_callback->addSuccessHandler( |
| [&, task_exec_ctx = _task_exec_ctx](const PTabletWriterAddBlockResult& result, |
| const WriteBlockCallbackContext& ctx) { |
| std::shared_ptr<TaskExecutionContext> ctx_lock = task_exec_ctx.lock(); |
| if (ctx_lock == nullptr) { |
| return; |
| } |
| _add_block_success_callback(result, ctx); |
| }); |
| |
| _name = fmt::format("VNodeChannel[{}-{}]", _index_channel->_index_id, _node_id); |
| // The node channel will send _batch_size rows of data each rpc. When the |
| // number of tablets is large, the number of data rows received by each |
| // tablet is small, TabletsChannel need to traverse each tablet for import. |
| // so the import performance is poor. Therefore, we set _batch_size to |
| // a relatively large value to improve the import performance. |
| _batch_size = std::max(_batch_size, 8192); |
| |
| if (_state) { |
| QueryContext* query_ctx = _state->get_query_ctx(); |
| if (query_ctx) { |
| auto wg_ptr = query_ctx->workload_group(); |
| if (wg_ptr) { |
| _wg_id = wg_ptr->id(); |
| } |
| } |
| } |
| |
| _inited = true; |
| return Status::OK(); |
| } |
| |
| void VNodeChannel::_open_internal(bool is_incremental) { |
| if (_tablets_wait_open.empty()) { |
| return; |
| } |
| SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get()); |
| auto request = std::make_shared<PTabletWriterOpenRequest>(); |
| request->set_allocated_id(&_parent->_load_id); |
| request->set_index_id(_index_channel->_index_id); |
| request->set_txn_id(_parent->_txn_id); |
| request->set_sender_id(_parent->_sender_id); |
| request->set_allocated_schema(_parent->_schema->to_protobuf()); |
| if (_parent->_t_sink.olap_table_sink.__isset.storage_vault_id) { |
| request->set_storage_vault_id(_parent->_t_sink.olap_table_sink.storage_vault_id); |
| } |
| std::set<int64_t> deduper; |
| for (auto& tablet : _tablets_wait_open) { |
| if (deduper.contains(tablet.tablet_id)) { |
| continue; |
| } |
| auto* ptablet = request->add_tablets(); |
| ptablet->set_partition_id(tablet.partition_id); |
| ptablet->set_tablet_id(tablet.tablet_id); |
| deduper.insert(tablet.tablet_id); |
| _all_tablets.push_back(std::move(tablet)); |
| } |
| _tablets_wait_open.clear(); |
| |
| request->set_num_senders(_parent->_num_senders); |
| request->set_need_gen_rollup(false); // Useless but it is a required field in pb |
| request->set_load_channel_timeout_s(_parent->_load_channel_timeout_s); |
| request->set_is_high_priority(_parent->_is_high_priority); |
| request->set_sender_ip(BackendOptions::get_localhost()); |
| request->set_is_vectorized(true); |
| request->set_backend_id(_node_id); |
| request->set_enable_profile(_state->enable_profile()); |
| request->set_is_incremental(is_incremental); |
| request->set_txn_expiration(_parent->_txn_expiration); |
| request->set_write_file_cache(_parent->_write_file_cache); |
| |
| if (_wg_id > 0) { |
| request->set_workload_group_id(_wg_id); |
| } |
| |
| auto open_callback = DummyBrpcCallback<PTabletWriterOpenResult>::create_shared(); |
| auto open_closure = AutoReleaseClosure< |
| PTabletWriterOpenRequest, |
| DummyBrpcCallback<PTabletWriterOpenResult>>::create_unique(request, open_callback); |
| open_callback->cntl_->set_timeout_ms(config::tablet_writer_open_rpc_timeout_sec * 1000); |
| if (config::tablet_writer_ignore_eovercrowded) { |
| open_callback->cntl_->ignore_eovercrowded(); |
| } |
| VLOG_DEBUG << fmt::format("txn {}: open NodeChannel to {}, incremental: {}, senders: {}", |
| _parent->_txn_id, _node_id, is_incremental, _parent->_num_senders); |
| // the real transmission here. the corresponding BE's load mgr will open load channel for it. |
| _stub->tablet_writer_open(open_closure->cntl_.get(), open_closure->request_.get(), |
| open_closure->response_.get(), open_closure.get()); |
| open_closure.release(); |
| _open_callbacks.push_back(open_callback); |
| |
| static_cast<void>(request->release_id()); |
| static_cast<void>(request->release_schema()); |
| } |
| |
| void VNodeChannel::open() { |
| _open_internal(false); |
| } |
| |
| void VNodeChannel::incremental_open() { |
| VLOG_DEBUG << "incremental opening node channel" << _node_id; |
| _open_internal(true); |
| } |
| |
| Status VNodeChannel::open_wait() { |
| Status status; |
| for (auto& open_callback : _open_callbacks) { |
| // because of incremental open, we will wait multi times. so skip the closures which have been checked and set to nullptr in previous rounds |
| if (open_callback == nullptr) { |
| continue; |
| } |
| |
| open_callback->join(); |
| SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get()); |
| if (open_callback->cntl_->Failed()) { |
| if (!ExecEnv::GetInstance()->brpc_internal_client_cache()->available( |
| _stub, _node_info.host, _node_info.brpc_port)) { |
| ExecEnv::GetInstance()->brpc_internal_client_cache()->erase( |
| open_callback->cntl_->remote_side()); |
| } |
| _cancelled = true; |
| auto error_code = open_callback->cntl_->ErrorCode(); |
| auto error_text = open_callback->cntl_->ErrorText(); |
| if (error_text.find("Reached timeout") != std::string::npos) { |
| LOG(WARNING) << "failed to open tablet writer may caused by timeout. increase BE " |
| "config `tablet_writer_open_rpc_timeout_sec` if you are sure that " |
| "your table building and data are reasonable."; |
| } |
| return Status::Error<ErrorCode::INTERNAL_ERROR, false>( |
| "failed to open tablet writer, error={}, error_text={}, info={}", |
| berror(error_code), error_text, channel_info()); |
| } |
| status = Status::create(open_callback->response_->status()); |
| |
| if (!status.ok()) { |
| _cancelled = true; |
| return status; |
| } |
| } |
| |
| return status; |
| } |
| |
| Status VNodeChannel::add_block(Block* block, const Payload* payload) { |
| SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get()); |
| if (payload->second.empty()) { |
| return Status::OK(); |
| } |
| // If add_block() when _eos_is_produced==true, there must be sth wrong, we can only mark this channel as failed. |
| auto st = none_of({_cancelled, _eos_is_produced}); |
| if (!st.ok()) { |
| if (_cancelled) { |
| std::lock_guard<std::mutex> l(_cancel_msg_lock); |
| return Status::Error<ErrorCode::INTERNAL_ERROR, false>("add row failed. {}", |
| _cancel_msg); |
| } else { |
| return std::move(st.prepend("already stopped, can't add row. cancelled/eos: ")); |
| } |
| } |
| |
| // We use OlapTableSink mem_tracker which has the same ancestor of _plan node, |
| // so in the ideal case, mem limit is a matter for _plan node. |
| // But there is still some unfinished things, we do mem limit here temporarily. |
| // _cancelled may be set by rpc callback, and it's possible that _cancelled might be set in any of the steps below. |
| // It's fine to do a fake add_block() and return OK, because we will check _cancelled in next add_block() or mark_close(). |
| constexpr int64_t kBackPressureSleepMs = 10; |
| auto* memtable_limiter = ExecEnv::GetInstance()->memtable_memory_limiter(); |
| while (true) { |
| bool is_exceed_soft_mem_limit = GlobalMemoryArbitrator::is_exceed_soft_mem_limit(); |
| int64_t memtable_mem = |
| (memtable_limiter != nullptr && memtable_limiter->mem_tracker() != nullptr) |
| ? memtable_limiter->mem_tracker()->consumption() |
| : 0; |
| // Note: Memtable memory is not included in load memory statistics (MemoryProfile::load_current_usage()) |
| // for performance and memory control complexity reasons. Therefore, we explicitly add memtable memory |
| // consumption here to ensure accurate back pressure decisions and prevent OOM during heavy loads. |
| auto current_load_mem_value = MemoryProfile::load_current_usage() + memtable_mem; |
| bool mem_limit_exceeded = is_exceed_soft_mem_limit || |
| current_load_mem_value > _load_mem_limit || |
| _pending_batches_bytes > _max_pending_batches_bytes; |
| bool need_back_pressure = !_cancelled && !_state->is_cancelled() && |
| _pending_batches_num > 0 && mem_limit_exceeded; |
| if (!need_back_pressure) { |
| break; |
| } |
| SCOPED_RAW_TIMER(&_stat.mem_exceeded_block_ns); |
| std::this_thread::sleep_for(std::chrono::milliseconds(kBackPressureSleepMs)); |
| } |
| |
| if (UNLIKELY(!_cur_mutable_block)) { |
| _cur_mutable_block = MutableBlock::create_unique(block->clone_empty()); |
| } |
| |
| SCOPED_RAW_TIMER(&_stat.append_node_channel_ns); |
| st = block->append_to_block_by_selector(_cur_mutable_block.get(), *(payload->first)); |
| if (!st.ok()) { |
| _cancel_with_msg(fmt::format("{}, err: {}", channel_info(), st.to_string())); |
| return st; |
| } |
| for (auto tablet_id : payload->second) { |
| _cur_add_block_request->add_tablet_ids(tablet_id); |
| } |
| _write_bytes.fetch_add(_cur_mutable_block->bytes()); |
| |
| if (_cur_mutable_block->rows() >= _batch_size || |
| _cur_mutable_block->bytes() > config::doris_scanner_row_bytes) { |
| { |
| SCOPED_ATOMIC_TIMER(&_queue_push_lock_ns); |
| std::lock_guard<std::mutex> l(_pending_batches_lock); |
| // To simplify the add_row logic, postpone adding block into req until the time of sending req |
| _pending_batches_bytes += _cur_mutable_block->allocated_bytes(); |
| _cur_add_block_request->set_eos( |
| false); // for multi-add, only when marking close we set it eos. |
| // Copy the request to tmp request to add to pend block queue |
| auto tmp_add_block_request = std::make_shared<PTabletWriterAddBlockRequest>(); |
| *tmp_add_block_request = *_cur_add_block_request; |
| _pending_blocks.emplace(std::move(_cur_mutable_block), tmp_add_block_request); |
| _pending_batches_num++; |
| VLOG_DEBUG << "VTabletWriter:" << _parent << " VNodeChannel:" << this |
| << " pending_batches_bytes:" << _pending_batches_bytes |
| << " jobid:" << std::to_string(_state->load_job_id()) |
| << " loadinfo:" << _load_info; |
| } |
| _cur_mutable_block = MutableBlock::create_unique(block->clone_empty()); |
| _cur_add_block_request->clear_tablet_ids(); |
| } |
| |
| return Status::OK(); |
| } |
| |
| static void injection_full_gc_fn() { |
| MemoryReclamation::revoke_process_memory("injection_full_gc_fn"); |
| } |
| |
| int VNodeChannel::try_send_and_fetch_status(RuntimeState* state, |
| std::unique_ptr<ThreadPoolToken>& thread_pool_token) { |
| DBUG_EXECUTE_IF("VNodeChannel.try_send_and_fetch_status_full_gc", { |
| std::thread t(injection_full_gc_fn); |
| t.join(); |
| }); |
| |
| if (_cancelled || _send_finished) { // not run |
| return 0; |
| } |
| |
| auto load_back_pressure_version_wait_time_ms = _load_back_pressure_version_wait_time_ms.load(); |
| if (UNLIKELY(load_back_pressure_version_wait_time_ms > 0)) { |
| std::this_thread::sleep_for( |
| std::chrono::milliseconds(load_back_pressure_version_wait_time_ms)); |
| _load_back_pressure_version_block_ms.fetch_add( |
| load_back_pressure_version_wait_time_ms); // already in milliseconds |
| _load_back_pressure_version_wait_time_ms = 0; |
| } |
| |
| // set closure for sending block. |
| if (!_send_block_callback->try_set_in_flight()) { |
| // There is packet in flight, skip. |
| return _send_finished ? 0 : 1; |
| } |
| |
| // We are sure that try_send_batch is not running |
| if (_pending_batches_num > 0) { |
| auto s = thread_pool_token->submit_func([this, state] { try_send_pending_block(state); }); |
| if (!s.ok()) { |
| _cancel_with_msg("submit send_batch task to send_batch_thread_pool failed"); |
| // sending finished. clear in flight |
| _send_block_callback->clear_in_flight(); |
| } |
| // in_flight is cleared in closure::Run |
| } else { |
| // sending finished. clear in flight |
| _send_block_callback->clear_in_flight(); |
| } |
| return _send_finished ? 0 : 1; |
| } |
| |
| void VNodeChannel::_cancel_with_msg(const std::string& msg) { |
| LOG(WARNING) << "cancel node channel " << channel_info() << ", error message: " << msg; |
| { |
| std::lock_guard<std::mutex> l(_cancel_msg_lock); |
| if (_cancel_msg.empty()) { |
| _cancel_msg = msg; |
| } |
| } |
| _cancelled = true; |
| } |
| |
| void VNodeChannel::_refresh_back_pressure_version_wait_time( |
| const ::google::protobuf::RepeatedPtrField<::doris::PTabletLoadRowsetInfo>& |
| tablet_load_infos) { |
| int64_t max_rowset_num_gap = 0; |
| // if any one tablet is under high load pressure, we would make the whole procedure |
| // sleep to prevent the corresponding BE return -235 |
| std::for_each( |
| tablet_load_infos.begin(), tablet_load_infos.end(), |
| [&max_rowset_num_gap](auto& load_info) { |
| int64_t cur_rowset_num = load_info.current_rowset_nums(); |
| int64_t high_load_point = load_info.max_config_rowset_nums() * |
| (config::load_back_pressure_version_threshold / 100); |
| DCHECK(cur_rowset_num > high_load_point); |
| max_rowset_num_gap = std::max(max_rowset_num_gap, cur_rowset_num - high_load_point); |
| }); |
| // to slow down the high load pressure |
| // we would use the rowset num gap to calculate one sleep time |
| // for example: |
| // if the max tablet version is 2000, there are 3 BE |
| // A: ==================== 1800 |
| // B: =================== 1700 |
| // C: ================== 1600 |
| // ================== 1600 |
| // ^ |
| // the high load point |
| // then then max gap is 1800 - (max tablet version * config::load_back_pressure_version_threshold / 100) = 200, |
| // we would make the whole send procesure sleep |
| // 1200ms for compaction to be done toe reduce the high pressure |
| auto max_time = config::max_load_back_pressure_version_wait_time_ms; |
| if (UNLIKELY(max_rowset_num_gap > 0)) { |
| _load_back_pressure_version_wait_time_ms.store( |
| std::min(max_rowset_num_gap + 1000, max_time)); |
| LOG(INFO) << "try to back pressure version, wait time(ms): " |
| << _load_back_pressure_version_wait_time_ms |
| << ", load id: " << print_id(_parent->_load_id) |
| << ", max_rowset_num_gap: " << max_rowset_num_gap; |
| } |
| } |
| |
| void VNodeChannel::try_send_pending_block(RuntimeState* state) { |
| SCOPED_ATTACH_TASK(state); |
| SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker); |
| SCOPED_ATOMIC_TIMER(&_actual_consume_ns); |
| signal::set_signal_task_id(_parent->_load_id); |
| AddBlockReq send_block; |
| { |
| std::lock_guard<std::mutex> l(_pending_batches_lock); |
| DCHECK(!_pending_blocks.empty()); |
| send_block = std::move(_pending_blocks.front()); |
| _pending_blocks.pop(); |
| _pending_batches_num--; |
| _pending_batches_bytes -= send_block.first->allocated_bytes(); |
| } |
| |
| auto mutable_block = std::move(send_block.first); |
| auto request = std::move(send_block.second); // doesn't need to be saved in heap |
| |
| // tablet_ids has already set when add row |
| request->set_packet_seq(_next_packet_seq); |
| auto block = mutable_block->to_block(); |
| CHECK(block.rows() == request->tablet_ids_size()) |
| << "block rows: " << block.rows() |
| << ", tablet_ids_size: " << request->tablet_ids_size(); |
| if (block.rows() > 0) { |
| SCOPED_ATOMIC_TIMER(&_serialize_batch_ns); |
| size_t uncompressed_bytes = 0, compressed_bytes = 0; |
| int64_t compressed_time = 0; |
| Status st = block.serialize(state->be_exec_version(), request->mutable_block(), |
| &uncompressed_bytes, &compressed_bytes, &compressed_time, |
| state->fragement_transmission_compression_type(), |
| _parent->_transfer_large_data_by_brpc); |
| TEST_INJECTION_POINT_CALLBACK("VNodeChannel::try_send_block", &st); |
| if (!st.ok()) { |
| cancel(fmt::format("{}, err: {}", channel_info(), st.to_string())); |
| _send_block_callback->clear_in_flight(); |
| return; |
| } |
| if (double(compressed_bytes) >= double(config::brpc_max_body_size) * 0.95F) { |
| LOG(WARNING) << "send block too large, this rpc may failed. send size: " |
| << compressed_bytes << ", threshold: " << config::brpc_max_body_size |
| << ", " << channel_info(); |
| } |
| } |
| |
| auto remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time() / NANOS_PER_MILLIS; |
| if (UNLIKELY(remain_ms < config::min_load_rpc_timeout_ms)) { |
| if (remain_ms <= 0 && !request->eos()) { |
| cancel(fmt::format("{}, err: load timeout after {} ms", channel_info(), |
| _rpc_timeout_ms)); |
| _send_block_callback->clear_in_flight(); |
| return; |
| } else { |
| remain_ms = config::min_load_rpc_timeout_ms; |
| } |
| } |
| |
| _send_block_callback->reset(); |
| _send_block_callback->cntl_->set_timeout_ms(remain_ms); |
| if (config::tablet_writer_ignore_eovercrowded) { |
| _send_block_callback->cntl_->ignore_eovercrowded(); |
| } |
| |
| if (request->eos()) { |
| for (auto pid : _parent->_tablet_finder->partition_ids()) { |
| request->add_partition_ids(pid); |
| } |
| |
| request->set_write_single_replica(_parent->_write_single_replica); |
| if (_parent->_write_single_replica) { |
| for (auto& _slave_tablet_node : _slave_tablet_nodes) { |
| PSlaveTabletNodes slave_tablet_nodes; |
| for (auto node_id : _slave_tablet_node.second) { |
| const auto* node = _parent->_nodes_info->find_node(node_id); |
| DBUG_EXECUTE_IF("VNodeChannel.try_send_pending_block.slave_node_not_found", { |
| LOG(WARNING) << "trigger " |
| "VNodeChannel.try_send_pending_block.slave_node_not_found " |
| "debug point will set node to nullptr"; |
| node = nullptr; |
| }); |
| if (node == nullptr) { |
| LOG(WARNING) << "slave node not found, node_id=" << node_id; |
| cancel(fmt::format("slave node not found, node_id={}", node_id)); |
| _send_block_callback->clear_in_flight(); |
| return; |
| } |
| PNodeInfo* pnode = slave_tablet_nodes.add_slave_nodes(); |
| pnode->set_id(node->id); |
| pnode->set_option(node->option); |
| pnode->set_host(node->host); |
| pnode->set_async_internal_port(node->brpc_port); |
| } |
| request->mutable_slave_tablet_nodes()->insert( |
| {_slave_tablet_node.first, slave_tablet_nodes}); |
| } |
| } |
| |
| // eos request must be the last request-> it's a signal makeing callback function to set _add_batch_finished true. |
| // end_mark makes is_last_rpc true when rpc finished and call callbacks. |
| _send_block_callback->end_mark(); |
| _send_finished = true; |
| CHECK(_pending_batches_num == 0) << _pending_batches_num; |
| } |
| |
| auto send_block_closure = AutoReleaseClosure< |
| PTabletWriterAddBlockRequest, |
| WriteBlockCallback<PTabletWriterAddBlockResult>>::create_unique(request, |
| _send_block_callback); |
| if (_parent->_transfer_large_data_by_brpc && request->has_block() && |
| request->block().has_column_values() && request->ByteSizeLong() > MIN_HTTP_BRPC_SIZE) { |
| Status st = request_embed_attachment_contain_blockv2(send_block_closure->request_.get(), |
| send_block_closure); |
| if (!st.ok()) { |
| cancel(fmt::format("{}, err: {}", channel_info(), st.to_string())); |
| _send_block_callback->clear_in_flight(); |
| return; |
| } |
| |
| std::string host = _node_info.host; |
| auto dns_cache = ExecEnv::GetInstance()->dns_cache(); |
| if (dns_cache == nullptr) { |
| LOG(WARNING) << "DNS cache is not initialized, skipping hostname resolve"; |
| } else if (!is_valid_ip(_node_info.host)) { |
| Status status = dns_cache->get(_node_info.host, &host); |
| if (!status.ok()) { |
| LOG(WARNING) << "failed to get ip from host " << _node_info.host << ": " |
| << status.to_string(); |
| cancel(fmt::format("failed to get ip from host {}", _node_info.host)); |
| _send_block_callback->clear_in_flight(); |
| return; |
| } |
| } |
| //format an ipv6 address |
| std::string brpc_url = get_brpc_http_url(host, _node_info.brpc_port); |
| std::shared_ptr<PBackendService_Stub> _brpc_http_stub = |
| _state->exec_env()->brpc_internal_client_cache()->get_new_client_no_cache(brpc_url, |
| "http"); |
| if (_brpc_http_stub == nullptr) { |
| cancel(fmt::format("{}, failed to open brpc http client to {}", channel_info(), |
| brpc_url)); |
| _send_block_callback->clear_in_flight(); |
| return; |
| } |
| _send_block_callback->cntl_->http_request().uri() = |
| brpc_url + "/PInternalServiceImpl/tablet_writer_add_block_by_http"; |
| _send_block_callback->cntl_->http_request().set_method(brpc::HTTP_METHOD_POST); |
| _send_block_callback->cntl_->http_request().set_content_type("application/json"); |
| |
| { |
| _brpc_http_stub->tablet_writer_add_block_by_http( |
| send_block_closure->cntl_.get(), nullptr, send_block_closure->response_.get(), |
| send_block_closure.get()); |
| send_block_closure.release(); |
| } |
| } else { |
| _send_block_callback->cntl_->http_request().Clear(); |
| { |
| _stub->tablet_writer_add_block( |
| send_block_closure->cntl_.get(), send_block_closure->request_.get(), |
| send_block_closure->response_.get(), send_block_closure.get()); |
| send_block_closure.release(); |
| } |
| } |
| |
| _next_packet_seq++; |
| } |
| |
| void VNodeChannel::_add_block_success_callback(const PTabletWriterAddBlockResult& result, |
| const WriteBlockCallbackContext& ctx) { |
| std::lock_guard<std::mutex> l(this->_closed_lock); |
| if (this->_is_closed) { |
| // if the node channel is closed, no need to call the following logic, |
| // and notice that _index_channel may already be destroyed. |
| return; |
| } |
| SCOPED_ATTACH_TASK(_state); |
| Status status(Status::create(result.status())); |
| if (status.ok()) { |
| _refresh_back_pressure_version_wait_time(result.tablet_load_rowset_num_infos()); |
| // if has error tablet, handle them first |
| for (const auto& error : result.tablet_errors()) { |
| _index_channel->mark_as_failed(this, "tablet error: " + error.msg(), error.tablet_id()); |
| } |
| |
| Status st = _index_channel->check_intolerable_failure(); |
| if (!st.ok()) { |
| _cancel_with_msg(st.to_string()); |
| } else if (ctx._is_last_rpc) { |
| bool skip_tablet_info = false; |
| DBUG_EXECUTE_IF("VNodeChannel.add_block_success_callback.incomplete_commit_info", |
| { skip_tablet_info = true; }); |
| for (const auto& tablet : result.tablet_vec()) { |
| DBUG_EXECUTE_IF("VNodeChannel.add_block_success_callback.incomplete_commit_info", { |
| if (skip_tablet_info) { |
| LOG(INFO) << "skip tablet info: " << tablet.tablet_id(); |
| skip_tablet_info = false; |
| continue; |
| } |
| }); |
| TTabletCommitInfo commit_info; |
| commit_info.tabletId = tablet.tablet_id(); |
| commit_info.backendId = _node_id; |
| _tablet_commit_infos.emplace_back(std::move(commit_info)); |
| if (tablet.has_received_rows()) { |
| _tablets_received_rows.emplace_back(tablet.tablet_id(), tablet.received_rows()); |
| } |
| if (tablet.has_num_rows_filtered()) { |
| _tablets_filtered_rows.emplace_back(tablet.tablet_id(), |
| tablet.num_rows_filtered()); |
| } |
| VLOG_CRITICAL << "master replica commit info: tabletId=" << tablet.tablet_id() |
| << ", backendId=" << _node_id |
| << ", master node id: " << this->node_id() |
| << ", host: " << this->host() << ", txn_id=" << _parent->_txn_id; |
| } |
| if (_parent->_write_single_replica) { |
| for (const auto& tablet_slave_node_ids : result.success_slave_tablet_node_ids()) { |
| for (auto slave_node_id : tablet_slave_node_ids.second.slave_node_ids()) { |
| TTabletCommitInfo commit_info; |
| commit_info.tabletId = tablet_slave_node_ids.first; |
| commit_info.backendId = slave_node_id; |
| _tablet_commit_infos.emplace_back(std::move(commit_info)); |
| VLOG_CRITICAL |
| << "slave replica commit info: tabletId=" |
| << tablet_slave_node_ids.first << ", backendId=" << slave_node_id |
| << ", master node id: " << this->node_id() |
| << ", host: " << this->host() << ", txn_id=" << _parent->_txn_id; |
| } |
| } |
| } |
| _add_batches_finished = true; |
| } |
| } else { |
| _cancel_with_msg(fmt::format("{}, add batch req success but status isn't ok, err: {}", |
| channel_info(), status.to_string())); |
| } |
| |
| if (result.has_execution_time_us()) { |
| _add_batch_counter.add_batch_execution_time_us += result.execution_time_us(); |
| _add_batch_counter.add_batch_wait_execution_time_us += result.wait_execution_time_us(); |
| _add_batch_counter.add_batch_num++; |
| } |
| if (result.has_load_channel_profile()) { |
| TRuntimeProfileTree tprofile; |
| const auto* buf = (const uint8_t*)result.load_channel_profile().data(); |
| auto len = cast_set<uint32_t>(result.load_channel_profile().size()); |
| auto st = deserialize_thrift_msg(buf, &len, false, &tprofile); |
| if (st.ok()) { |
| _state->load_channel_profile()->update(tprofile); |
| } else { |
| LOG(WARNING) << "load channel TRuntimeProfileTree deserialize failed, errmsg=" << st; |
| } |
| } |
| } |
| |
| void VNodeChannel::_add_block_failed_callback(const WriteBlockCallbackContext& ctx) { |
| std::lock_guard<std::mutex> l(this->_closed_lock); |
| if (this->_is_closed) { |
| // if the node channel is closed, no need to call `mark_as_failed`, |
| // and notice that _index_channel may already be destroyed. |
| return; |
| } |
| SCOPED_ATTACH_TASK(_state); |
| // If rpc failed, mark all tablets on this node channel as failed |
| _index_channel->mark_as_failed(this, |
| fmt::format("rpc failed, error code:{}, error text:{}", |
| _send_block_callback->cntl_->ErrorCode(), |
| _send_block_callback->cntl_->ErrorText()), |
| -1); |
| if (_send_block_callback->cntl_->ErrorText().find("Reached timeout") != std::string::npos) { |
| LOG(WARNING) << "rpc failed may caused by timeout. increase BE config " |
| "`min_load_rpc_timeout_ms` of to avoid this if you are sure that your " |
| "table building and data are reasonable."; |
| } |
| Status st = _index_channel->check_intolerable_failure(); |
| if (!st.ok()) { |
| _cancel_with_msg(fmt::format("{}, err: {}", channel_info(), st.to_string())); |
| } else if (ctx._is_last_rpc) { |
| // if this is last rpc, will must set _add_batches_finished. otherwise, node channel's close_wait |
| // will be blocked. |
| _add_batches_finished = true; |
| } |
| } |
| |
| // When _cancelled is true, we still need to send a tablet_writer_cancel |
| // rpc request to truly release the load channel |
| void VNodeChannel::cancel(const std::string& cancel_msg) { |
| if (_is_closed) { |
| // skip the channels that have been canceled or close_wait. |
| return; |
| } |
| SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get()); |
| // set _is_closed to true finally |
| Defer set_closed {[&]() { |
| std::lock_guard<std::mutex> l(_closed_lock); |
| _is_closed = true; |
| }}; |
| // we don't need to wait last rpc finished, cause closure's release/reset will join. |
| // But do we need brpc::StartCancel(call_id)? |
| _cancel_with_msg(cancel_msg); |
| // if not inited, _stub will be nullptr, skip sending cancel rpc |
| if (!_inited) { |
| return; |
| } |
| |
| auto request = std::make_shared<PTabletWriterCancelRequest>(); |
| request->set_allocated_id(&_parent->_load_id); |
| request->set_index_id(_index_channel->_index_id); |
| request->set_sender_id(_parent->_sender_id); |
| request->set_cancel_reason(cancel_msg); |
| |
| auto cancel_callback = DummyBrpcCallback<PTabletWriterCancelResult>::create_shared(); |
| auto closure = AutoReleaseClosure< |
| PTabletWriterCancelRequest, |
| DummyBrpcCallback<PTabletWriterCancelResult>>::create_unique(request, cancel_callback); |
| |
| auto remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time() / NANOS_PER_MILLIS; |
| if (UNLIKELY(remain_ms < config::min_load_rpc_timeout_ms)) { |
| remain_ms = config::min_load_rpc_timeout_ms; |
| } |
| cancel_callback->cntl_->set_timeout_ms(remain_ms); |
| if (config::tablet_writer_ignore_eovercrowded) { |
| closure->cntl_->ignore_eovercrowded(); |
| } |
| _stub->tablet_writer_cancel(closure->cntl_.get(), closure->request_.get(), |
| closure->response_.get(), closure.get()); |
| closure.release(); |
| static_cast<void>(request->release_id()); |
| } |
| |
| Status VNodeChannel::close_wait(RuntimeState* state, bool* is_closed) { |
| DBUG_EXECUTE_IF("VNodeChannel.close_wait_full_gc", { |
| std::thread t(injection_full_gc_fn); |
| t.join(); |
| }); |
| SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get()); |
| |
| *is_closed = true; |
| |
| auto st = none_of({_cancelled, !_eos_is_produced}); |
| if (!st.ok()) { |
| if (_cancelled) { |
| std::lock_guard<std::mutex> l(_cancel_msg_lock); |
| return Status::Error<ErrorCode::INTERNAL_ERROR, false>("wait close failed. {}", |
| _cancel_msg); |
| } else { |
| return std::move( |
| st.prepend("already stopped, skip waiting for close. cancelled/!eos: ")); |
| } |
| } |
| |
| DBUG_EXECUTE_IF("VNodeChannel.close_wait.cancelled", { |
| _cancelled = true; |
| _cancel_msg = "injected cancel"; |
| }); |
| |
| if (state->is_cancelled()) { |
| _cancel_with_msg(state->cancel_reason().to_string()); |
| } |
| |
| // Waiting for finished until _add_batches_finished changed by rpc's finished callback. |
| // it may take a long time, so we couldn't set a timeout |
| // For pipeline engine, the close is called in async writer's process block method, |
| // so that it will not block pipeline thread. |
| if (!_add_batches_finished && !_cancelled && !state->is_cancelled()) { |
| *is_closed = false; |
| return Status::OK(); |
| } |
| VLOG_CRITICAL << _parent->_sender_id << " close wait finished"; |
| return Status::OK(); |
| } |
| |
| Status VNodeChannel::after_close_handle( |
| RuntimeState* state, WriterStats* writer_stats, |
| std::unordered_map<int64_t, AddBatchCounter>* node_add_batch_counter_map) { |
| Status st = Status::Error<ErrorCode::INTERNAL_ERROR, false>(get_cancel_msg()); |
| _close_time_ms = UnixMillis() - _close_time_ms; |
| |
| if (_add_batches_finished) { |
| _close_check(); |
| _state->add_tablet_commit_infos(_tablet_commit_infos); |
| |
| _index_channel->set_error_tablet_in_state(state); |
| _index_channel->set_tablets_received_rows(_tablets_received_rows, _node_id); |
| _index_channel->set_tablets_filtered_rows(_tablets_filtered_rows, _node_id); |
| |
| std::lock_guard<std::mutex> l(_closed_lock); |
| // only when normal close, we set _is_closed to true. |
| // otherwise, we will set it to true in cancel(). |
| _is_closed = true; |
| st = Status::OK(); |
| } |
| |
| time_report(node_add_batch_counter_map, writer_stats); |
| return st; |
| } |
| |
| Status VNodeChannel::check_status() { |
| return none_of({_cancelled, !_eos_is_produced}); |
| } |
| |
| void VNodeChannel::_close_check() { |
| std::lock_guard<std::mutex> lg(_pending_batches_lock); |
| CHECK(_pending_blocks.empty()) << name(); |
| CHECK(_cur_mutable_block == nullptr) << name(); |
| } |
| |
| void VNodeChannel::mark_close(bool hang_wait) { |
| auto st = none_of({_cancelled, _eos_is_produced}); |
| if (!st.ok()) { |
| return; |
| } |
| |
| _cur_add_block_request->set_eos(true); |
| _cur_add_block_request->set_hang_wait(hang_wait); |
| { |
| std::lock_guard<std::mutex> l(_pending_batches_lock); |
| if (!_cur_mutable_block) [[unlikely]] { |
| // never had a block arrived. add a dummy block |
| _cur_mutable_block = MutableBlock::create_unique(); |
| } |
| auto tmp_add_block_request = |
| std::make_shared<PTabletWriterAddBlockRequest>(*_cur_add_block_request); |
| // when prepare to close, add block to queue so that try_send_pending_block thread will send it. |
| _pending_blocks.emplace(std::move(_cur_mutable_block), tmp_add_block_request); |
| _pending_batches_num++; |
| DCHECK(_pending_blocks.back().second->eos()); |
| _close_time_ms = UnixMillis(); |
| LOG(INFO) << channel_info() |
| << " mark closed, left pending batch size: " << _pending_blocks.size() |
| << " hang_wait: " << hang_wait; |
| } |
| |
| _eos_is_produced = true; |
| } |
| |
| VTabletWriter::VTabletWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs, |
| std::shared_ptr<Dependency> dep, std::shared_ptr<Dependency> fin_dep) |
| : AsyncResultWriter(output_exprs, dep, fin_dep), _t_sink(t_sink) { |
| _transfer_large_data_by_brpc = config::transfer_large_data_by_brpc; |
| } |
| |
| void VTabletWriter::_send_batch_process() { |
| SCOPED_TIMER(_non_blocking_send_timer); |
| SCOPED_ATTACH_TASK(_state); |
| SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); |
| |
| int sleep_time = int(config::olap_table_sink_send_interval_microseconds * |
| (_vpartition->is_auto_partition() |
| ? config::olap_table_sink_send_interval_auto_partition_factor |
| : 1)); |
| |
| while (true) { |
| // incremental open will temporarily make channels into abnormal state. stop checking when this. |
| std::unique_lock<bthread::Mutex> l(_stop_check_channel); |
| |
| int running_channels_num = 0; |
| int opened_nodes = 0; |
| for (const auto& index_channel : _channels) { |
| index_channel->for_each_node_channel([&running_channels_num, |
| this](const std::shared_ptr<VNodeChannel>& ch) { |
| // if this channel all completed(cancelled), got 0. else 1. |
| running_channels_num += |
| ch->try_send_and_fetch_status(_state, this->_send_batch_thread_pool_token); |
| }); |
| opened_nodes += index_channel->num_node_channels(); |
| } |
| |
| // auto partition table may have no node channel temporarily. wait to open. |
| if (opened_nodes != 0 && running_channels_num == 0) { |
| LOG(INFO) << "All node channels are stopped(maybe finished/offending/cancelled), " |
| "sender thread exit. " |
| << print_id(_load_id); |
| return; |
| } |
| |
| // for auto partition tables, there's a situation: we haven't open any node channel but decide to cancel the task. |
| // then the judge in front will never be true because opened_nodes won't increase. so we have to specially check wether we called close. |
| // we must RECHECK opened_nodes below, after got closed signal, because it may changed. Think of this: |
| // checked opened_nodes = 0 ---> new block arrived ---> task finished, close() was called ---> we got _try_close here |
| // if we don't check again, we may lose the last package. |
| if (_try_close.load(std::memory_order_acquire)) { |
| opened_nodes = 0; |
| std::ranges::for_each(_channels, |
| [&opened_nodes](const std::shared_ptr<IndexChannel>& ich) { |
| opened_nodes += ich->num_node_channels(); |
| }); |
| if (opened_nodes == 0) { |
| LOG(INFO) << "No node channel have ever opened but now we have to close. sender " |
| "thread exit. " |
| << print_id(_load_id); |
| return; |
| } |
| } |
| bthread_usleep(sleep_time); |
| } |
| } |
| |
| static void* periodic_send_batch(void* writer) { |
| auto* tablet_writer = (VTabletWriter*)(writer); |
| tablet_writer->_send_batch_process(); |
| return nullptr; |
| } |
| |
| Status VTabletWriter::open(doris::RuntimeState* state, doris::RuntimeProfile* profile) { |
| RETURN_IF_ERROR(_init(state, profile)); |
| signal::set_signal_task_id(_load_id); |
| SCOPED_TIMER(profile->total_time_counter()); |
| SCOPED_TIMER(_open_timer); |
| SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); |
| |
| fmt::memory_buffer buf; |
| for (const auto& index_channel : _channels) { |
| fmt::format_to(buf, "index id:{}", index_channel->_index_id); |
| index_channel->for_each_node_channel( |
| [](const std::shared_ptr<VNodeChannel>& ch) { ch->open(); }); |
| } |
| VLOG_DEBUG << "list of open index id = " << fmt::to_string(buf); |
| |
| for (const auto& index_channel : _channels) { |
| index_channel->set_start_time(UnixMillis()); |
| index_channel->for_each_node_channel([&index_channel]( |
| const std::shared_ptr<VNodeChannel>& ch) { |
| auto st = ch->open_wait(); |
| if (!st.ok()) { |
| // The open() phase is mainly to generate DeltaWriter instances on the nodes corresponding to each node channel. |
| // This phase will not fail due to a single tablet. |
| // Therefore, if the open() phase fails, all tablets corresponding to the node need to be marked as failed. |
| index_channel->mark_as_failed( |
| ch.get(), |
| fmt::format("{}, open failed, err: {}", ch->channel_info(), st.to_string()), |
| -1); |
| } |
| }); |
| |
| RETURN_IF_ERROR(index_channel->check_intolerable_failure()); |
| } |
| _send_batch_thread_pool_token = state->exec_env()->send_batch_thread_pool()->new_token( |
| ThreadPool::ExecutionMode::CONCURRENT, _send_batch_parallelism); |
| |
| // start to send batch continually. this must be called after _init |
| if (bthread_start_background(&_sender_thread, nullptr, periodic_send_batch, (void*)this) != 0) { |
| return Status::Error<ErrorCode::INTERNAL_ERROR>("bthread_start_backgroud failed"); |
| } |
| return Status::OK(); |
| } |
| |
| Status VTabletWriter::on_partitions_created(TCreatePartitionResult* result) { |
| // add new tablet locations. it will use by address. so add to pool |
| auto* new_locations = _pool->add(new std::vector<TTabletLocation>(result->tablets)); |
| _location->add_locations(*new_locations); |
| if (_write_single_replica) { |
| auto* slave_locations = _pool->add(new std::vector<TTabletLocation>(result->slave_tablets)); |
| _slave_location->add_locations(*slave_locations); |
| } |
| |
| // update new node info |
| _nodes_info->add_nodes(result->nodes); |
| |
| // incremental open node channel |
| RETURN_IF_ERROR(_incremental_open_node_channel(result->partitions)); |
| |
| return Status::OK(); |
| } |
| |
| static Status on_partitions_created(void* writer, TCreatePartitionResult* result) { |
| return static_cast<VTabletWriter*>(writer)->on_partitions_created(result); |
| } |
| |
| Status VTabletWriter::_init_row_distribution() { |
| _row_distribution.init({.state = _state, |
| .block_convertor = _block_convertor.get(), |
| .tablet_finder = _tablet_finder.get(), |
| .vpartition = _vpartition, |
| .add_partition_request_timer = _add_partition_request_timer, |
| .txn_id = _txn_id, |
| .pool = _pool, |
| .location = _location, |
| .vec_output_expr_ctxs = &_vec_output_expr_ctxs, |
| .schema = _schema, |
| .caller = this, |
| .write_single_replica = _write_single_replica, |
| .create_partition_callback = &::doris::on_partitions_created}); |
| |
| return _row_distribution.open(_output_row_desc); |
| } |
| |
| Status VTabletWriter::_init(RuntimeState* state, RuntimeProfile* profile) { |
| DCHECK(_t_sink.__isset.olap_table_sink); |
| _pool = state->obj_pool(); |
| auto& table_sink = _t_sink.olap_table_sink; |
| _load_id.set_hi(table_sink.load_id.hi); |
| _load_id.set_lo(table_sink.load_id.lo); |
| _txn_id = table_sink.txn_id; |
| _num_replicas = table_sink.num_replicas; |
| _tuple_desc_id = table_sink.tuple_id; |
| _write_file_cache = table_sink.write_file_cache; |
| _schema.reset(new OlapTableSchemaParam()); |
| RETURN_IF_ERROR(_schema->init(table_sink.schema)); |
| _schema->set_timestamp_ms(state->timestamp_ms()); |
| _schema->set_nano_seconds(state->nano_seconds()); |
| _schema->set_timezone(state->timezone()); |
| _location = _pool->add(new OlapTableLocationParam(table_sink.location)); |
| _nodes_info = _pool->add(new DorisNodesInfo(table_sink.nodes_info)); |
| if (table_sink.__isset.write_single_replica && table_sink.write_single_replica) { |
| _write_single_replica = true; |
| _slave_location = _pool->add(new OlapTableLocationParam(table_sink.slave_location)); |
| if (!config::enable_single_replica_load) { |
| return Status::InternalError("single replica load is disabled on BE."); |
| } |
| } |
| |
| if (config::is_cloud_mode() && |
| (!table_sink.__isset.txn_timeout_s || table_sink.txn_timeout_s <= 0)) { |
| return Status::InternalError("The txn_timeout_s of TDataSink is invalid"); |
| } |
| _txn_expiration = ::time(nullptr) + table_sink.txn_timeout_s; |
| |
| if (table_sink.__isset.load_channel_timeout_s) { |
| _load_channel_timeout_s = table_sink.load_channel_timeout_s; |
| } else { |
| _load_channel_timeout_s = config::streaming_load_rpc_max_alive_time_sec; |
| } |
| if (table_sink.__isset.send_batch_parallelism && table_sink.send_batch_parallelism > 1) { |
| _send_batch_parallelism = table_sink.send_batch_parallelism; |
| } |
| // if distributed column list is empty, we can ensure that tablet is with random distribution info |
| // and if load_to_single_tablet is set and set to true, we should find only one tablet in one partition |
| // for the whole olap table sink |
| auto find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_ROW; |
| if (table_sink.partition.distributed_columns.empty()) { |
| if (table_sink.__isset.load_to_single_tablet && table_sink.load_to_single_tablet) { |
| find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_SINK; |
| } else { |
| find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_BATCH; |
| } |
| } |
| _vpartition = _pool->add(new doris::VOlapTablePartitionParam(_schema, table_sink.partition)); |
| _tablet_finder = std::make_unique<OlapTabletFinder>(_vpartition, find_tablet_mode); |
| RETURN_IF_ERROR(_vpartition->init()); |
| |
| _state = state; |
| _operator_profile = profile; |
| |
| _sender_id = state->per_fragment_instance_idx(); |
| _num_senders = state->num_per_fragment_instances(); |
| _is_high_priority = |
| (state->execution_timeout() <= config::load_task_high_priority_threshold_second); |
| DBUG_EXECUTE_IF("VTabletWriter._init.is_high_priority", { _is_high_priority = true; }); |
| // profile must add to state's object pool |
| _mem_tracker = |
| std::make_shared<MemTracker>("OlapTableSink:" + std::to_string(state->load_job_id())); |
| SCOPED_TIMER(profile->total_time_counter()); |
| SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); |
| |
| // get table's tuple descriptor |
| _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_desc_id); |
| if (_output_tuple_desc == nullptr) { |
| LOG(WARNING) << "unknown destination tuple descriptor, id=" << _tuple_desc_id; |
| return Status::InternalError("unknown destination tuple descriptor"); |
| } |
| |
| if (!_vec_output_expr_ctxs.empty() && |
| _output_tuple_desc->slots().size() != _vec_output_expr_ctxs.size()) { |
| LOG(WARNING) << "output tuple slot num should be equal to num of output exprs, " |
| << "output_tuple_slot_num " << _output_tuple_desc->slots().size() |
| << " output_expr_num " << _vec_output_expr_ctxs.size(); |
| return Status::InvalidArgument( |
| "output_tuple_slot_num {} should be equal to output_expr_num {}", |
| _output_tuple_desc->slots().size(), _vec_output_expr_ctxs.size()); |
| } |
| |
| _block_convertor = std::make_unique<OlapTableBlockConvertor>(_output_tuple_desc); |
| // if partition_type is OLAP_TABLE_SINK_HASH_PARTITIONED, we handle the processing of auto_increment column |
| // on exchange node rather than on TabletWriter |
| _block_convertor->init_autoinc_info( |
| _schema->db_id(), _schema->table_id(), _state->batch_size(), |
| _schema->is_fixed_partial_update() && !_schema->auto_increment_coulumn().empty(), |
| _schema->auto_increment_column_unique_id()); |
| _output_row_desc = _pool->add(new RowDescriptor(_output_tuple_desc)); |
| |
| // add all counter |
| _input_rows_counter = ADD_COUNTER(profile, "RowsRead", TUnit::UNIT); |
| _output_rows_counter = ADD_COUNTER(profile, "RowsProduced", TUnit::UNIT); |
| _filtered_rows_counter = ADD_COUNTER(profile, "RowsFiltered", TUnit::UNIT); |
| _send_data_timer = ADD_TIMER(profile, "SendDataTime"); |
| _wait_mem_limit_timer = ADD_CHILD_TIMER(profile, "WaitMemLimitTime", "SendDataTime"); |
| _row_distribution_timer = ADD_CHILD_TIMER(profile, "RowDistributionTime", "SendDataTime"); |
| _filter_timer = ADD_CHILD_TIMER(profile, "FilterTime", "SendDataTime"); |
| _where_clause_timer = ADD_CHILD_TIMER(profile, "WhereClauseTime", "SendDataTime"); |
| _append_node_channel_timer = ADD_CHILD_TIMER(profile, "AppendNodeChannelTime", "SendDataTime"); |
| _add_partition_request_timer = |
| ADD_CHILD_TIMER(profile, "AddPartitionRequestTime", "SendDataTime"); |
| _validate_data_timer = ADD_TIMER(profile, "ValidateDataTime"); |
| _open_timer = ADD_TIMER(profile, "OpenTime"); |
| _close_timer = ADD_TIMER(profile, "CloseWaitTime"); |
| _non_blocking_send_timer = ADD_TIMER(profile, "NonBlockingSendTime"); |
| _non_blocking_send_work_timer = |
| ADD_CHILD_TIMER(profile, "NonBlockingSendWorkTime", "NonBlockingSendTime"); |
| _serialize_batch_timer = |
| ADD_CHILD_TIMER(profile, "SerializeBatchTime", "NonBlockingSendWorkTime"); |
| _total_add_batch_exec_timer = ADD_TIMER(profile, "TotalAddBatchExecTime"); |
| _max_add_batch_exec_timer = ADD_TIMER(profile, "MaxAddBatchExecTime"); |
| _total_wait_exec_timer = ADD_TIMER(profile, "TotalWaitExecTime"); |
| _max_wait_exec_timer = ADD_TIMER(profile, "MaxWaitExecTime"); |
| _add_batch_number = ADD_COUNTER(profile, "NumberBatchAdded", TUnit::UNIT); |
| _num_node_channels = ADD_COUNTER(profile, "NumberNodeChannels", TUnit::UNIT); |
| _load_back_pressure_version_time_ms = ADD_TIMER(profile, "LoadBackPressureVersionTimeMs"); |
| |
| #ifdef DEBUG |
| // check: tablet ids should be unique |
| { |
| std::unordered_set<int64_t> tablet_ids; |
| const auto& partitions = _vpartition->get_partitions(); |
| for (int i = 0; i < _schema->indexes().size(); ++i) { |
| for (const auto& partition : partitions) { |
| for (const auto& tablet : partition->indexes[i].tablets) { |
| CHECK(tablet_ids.count(tablet) == 0) << "found duplicate tablet id: " << tablet; |
| tablet_ids.insert(tablet); |
| } |
| } |
| } |
| } |
| #endif |
| |
| // open all channels |
| const auto& partitions = _vpartition->get_partitions(); |
| for (int i = 0; i < _schema->indexes().size(); ++i) { |
| // collect all tablets belong to this rollup |
| std::vector<TTabletWithPartition> tablets; |
| auto* index = _schema->indexes()[i]; |
| for (const auto& part : partitions) { |
| for (const auto& tablet : part->indexes[i].tablets) { |
| TTabletWithPartition tablet_with_partition; |
| tablet_with_partition.partition_id = part->id; |
| tablet_with_partition.tablet_id = tablet; |
| tablets.emplace_back(std::move(tablet_with_partition)); |
| _build_tablet_replica_info(tablet, part); |
| } |
| } |
| if (tablets.empty() && !_vpartition->is_auto_partition()) { |
| LOG(WARNING) << "load job:" << state->load_job_id() << " index: " << index->index_id |
| << " would open 0 tablet"; |
| } |
| _channels.emplace_back(new IndexChannel(this, index->index_id, index->where_clause)); |
| _index_id_to_channel[index->index_id] = _channels.back(); |
| RETURN_IF_ERROR(_channels.back()->init(state, tablets)); |
| } |
| |
| RETURN_IF_ERROR(_init_row_distribution()); |
| |
| _inited = true; |
| return Status::OK(); |
| } |
| |
| Status VTabletWriter::_incremental_open_node_channel( |
| const std::vector<TOlapTablePartition>& partitions) { |
| // do what we did in prepare() for partitions. indexes which don't change when we create new partition is orthogonal to partitions. |
| std::unique_lock<bthread::Mutex> _l(_stop_check_channel); |
| for (int i = 0; i < _schema->indexes().size(); ++i) { |
| const OlapTableIndexSchema* index = _schema->indexes()[i]; |
| std::vector<TTabletWithPartition> tablets; |
| for (const auto& t_part : partitions) { |
| VOlapTablePartition* part = nullptr; |
| RETURN_IF_ERROR(_vpartition->generate_partition_from(t_part, part)); |
| for (const auto& tablet : part->indexes[i].tablets) { |
| TTabletWithPartition tablet_with_partition; |
| tablet_with_partition.partition_id = part->id; |
| tablet_with_partition.tablet_id = tablet; |
| tablets.emplace_back(std::move(tablet_with_partition)); |
| _build_tablet_replica_info(tablet, part); |
| } |
| DCHECK(!tablets.empty()) << "incremental open got nothing!"; |
| } |
| // update and reinit for existing channels. |
| std::shared_ptr<IndexChannel> channel = _index_id_to_channel[index->index_id]; |
| DCHECK(channel != nullptr); |
| RETURN_IF_ERROR(channel->init(_state, tablets, true)); // add tablets into it |
| } |
| |
| fmt::memory_buffer buf; |
| for (auto& channel : _channels) { |
| // incremental open new partition's tablet on storage side |
| channel->for_each_node_channel( |
| [](const std::shared_ptr<VNodeChannel>& ch) { ch->incremental_open(); }); |
| fmt::format_to(buf, "index id:{}", channel->_index_id); |
| VLOG_DEBUG << "list of open index id = " << fmt::to_string(buf); |
| |
| channel->for_each_node_channel([&channel](const std::shared_ptr<VNodeChannel>& ch) { |
| auto st = ch->open_wait(); |
| if (!st.ok()) { |
| // The open() phase is mainly to generate DeltaWriter instances on the nodes corresponding to each node channel. |
| // This phase will not fail due to a single tablet. |
| // Therefore, if the open() phase fails, all tablets corresponding to the node need to be marked as failed. |
| channel->mark_as_failed( |
| ch.get(), |
| fmt::format("{}, open failed, err: {}", ch->channel_info(), st.to_string()), |
| -1); |
| } |
| }); |
| |
| RETURN_IF_ERROR(channel->check_intolerable_failure()); |
| } |
| |
| return Status::OK(); |
| } |
| |
| void VTabletWriter::_build_tablet_replica_info(const int64_t tablet_id, |
| VOlapTablePartition* partition) { |
| if (partition != nullptr) { |
| int total_replicas_num = |
| partition->total_replica_num == 0 ? _num_replicas : partition->total_replica_num; |
| int load_required_replicas_num = partition->load_required_replica_num == 0 |
| ? (_num_replicas + 1) / 2 |
| : partition->load_required_replica_num; |
| _tablet_replica_info.emplace( |
| tablet_id, std::make_pair(total_replicas_num, load_required_replicas_num)); |
| // Copy version gap backends info for this tablet |
| if (auto it = partition->tablet_version_gap_backends.find(tablet_id); |
| it != partition->tablet_version_gap_backends.end()) { |
| _tablet_version_gap_backends[tablet_id] = it->second; |
| } |
| } else { |
| _tablet_replica_info.emplace(tablet_id, |
| std::make_pair(_num_replicas, (_num_replicas + 1) / 2)); |
| } |
| } |
| |
| void VTabletWriter::_cancel_all_channel(Status status) { |
| for (const auto& index_channel : _channels) { |
| index_channel->for_each_node_channel([&status](const std::shared_ptr<VNodeChannel>& ch) { |
| ch->cancel(status.to_string()); |
| }); |
| } |
| LOG(INFO) << fmt::format( |
| "close olap table sink. load_id={}, txn_id={}, canceled all node channels due to " |
| "error: {}", |
| print_id(_load_id), _txn_id, status); |
| } |
| |
| Status VTabletWriter::_send_new_partition_batch() { |
| if (_row_distribution.need_deal_batching()) { // maybe try_close more than 1 time |
| RETURN_IF_ERROR(_row_distribution.automatic_create_partition()); |
| |
| Block tmp_block = _row_distribution._batching_block->to_block(); // Borrow out, for lval ref |
| |
| // these order is unique. |
| // 1. clear batching stats(and flag goes true) so that we won't make a new batching process in dealing batched block. |
| // 2. deal batched block |
| // 3. now reuse the column of lval block. cuz write doesn't real adjust it. it generate a new block from that. |
| _row_distribution.clear_batching_stats(); |
| RETURN_IF_ERROR(this->write(_state, tmp_block)); |
| _row_distribution._batching_block->set_mutable_columns( |
| tmp_block.mutate_columns()); // Recovery back |
| _row_distribution._batching_block->clear_column_data(); |
| _row_distribution._deal_batched = false; |
| } |
| return Status::OK(); |
| } |
| |
| void VTabletWriter::_do_try_close(RuntimeState* state, const Status& exec_status) { |
| SCOPED_TIMER(_close_timer); |
| Status status = exec_status; |
| |
| // must before set _try_close |
| if (status.ok()) { |
| SCOPED_TIMER(_operator_profile->total_time_counter()); |
| _row_distribution._deal_batched = true; |
| status = _send_new_partition_batch(); |
| } |
| |
| _try_close.store(true, std::memory_order_release); // will stop periodic thread |
| if (status.ok()) { |
| // BE id -> add_batch method counter |
| std::unordered_map<int64_t, AddBatchCounter> node_add_batch_counter_map; |
| |
| // only if status is ok can we call this _profile->total_time_counter(). |
| // if status is not ok, this sink may not be prepared, so that _profile is null |
| SCOPED_TIMER(_operator_profile->total_time_counter()); |
| for (const auto& index_channel : _channels) { |
| // two-step mark close. first we send close_origin to recievers to close all originly exist TabletsChannel. |
| // when they all closed, we are sure all Writer of instances called _do_try_close. that means no new channel |
| // will be opened. the refcount of recievers will be monotonically decreasing. then we are safe to close all |
| // our channels. |
| if (index_channel->has_incremental_node_channel()) { |
| if (!status.ok()) { |
| break; |
| } |
| VLOG_TRACE << _sender_id << " first stage close start " << _txn_id; |
| index_channel->for_init_node_channel( |
| [&index_channel, &status, this](const std::shared_ptr<VNodeChannel>& ch) { |
| if (!status.ok() || ch->is_closed()) { |
| return; |
| } |
| VLOG_DEBUG << index_channel->_parent->_sender_id << "'s " << ch->host() |
| << "mark close1 for inits " << _txn_id; |
| ch->mark_close(true); |
| if (ch->is_cancelled()) { |
| status = cancel_channel_and_check_intolerable_failure( |
| std::move(status), ch->get_cancel_msg(), *index_channel, |
| *ch); |
| } |
| }); |
| if (!status.ok()) { |
| break; |
| } |
| // Do not need to wait after quorum success, |
| // for first-stage close_wait only ensure incremental node channels load has been completed, |
| // unified waiting in the second-stage close_wait. |
| status = index_channel->close_wait(_state, nullptr, nullptr, |
| index_channel->init_node_channel_ids(), false); |
| if (!status.ok()) { |
| break; |
| } |
| VLOG_DEBUG << _sender_id << " first stage finished. closeing inc nodes " << _txn_id; |
| index_channel->for_inc_node_channel( |
| [&index_channel, &status, this](const std::shared_ptr<VNodeChannel>& ch) { |
| if (!status.ok() || ch->is_closed()) { |
| return; |
| } |
| // only first try close, all node channels will mark_close() |
| VLOG_DEBUG << index_channel->_parent->_sender_id << "'s " << ch->host() |
| << "mark close2 for inc " << _txn_id; |
| ch->mark_close(); |
| if (ch->is_cancelled()) { |
| status = cancel_channel_and_check_intolerable_failure( |
| std::move(status), ch->get_cancel_msg(), *index_channel, |
| *ch); |
| } |
| }); |
| } else { // not has_incremental_node_channel |
| VLOG_TRACE << _sender_id << " has no incremental channels " << _txn_id; |
| index_channel->for_each_node_channel( |
| [&index_channel, &status](const std::shared_ptr<VNodeChannel>& ch) { |
| if (!status.ok() || ch->is_closed()) { |
| return; |
| } |
| // only first try close, all node channels will mark_close() |
| ch->mark_close(); |
| if (ch->is_cancelled()) { |
| status = cancel_channel_and_check_intolerable_failure( |
| std::move(status), ch->get_cancel_msg(), *index_channel, |
| *ch); |
| } |
| }); |
| } |
| } // end for index channels |
| } |
| |
| if (!status.ok()) { |
| _cancel_all_channel(status); |
| _close_status = status; |
| } |
| } |
| |
| Status VTabletWriter::close(Status exec_status) { |
| if (!_inited) { |
| DCHECK(!exec_status.ok()); |
| _cancel_all_channel(exec_status); |
| _close_status = exec_status; |
| return _close_status; |
| } |
| |
| SCOPED_TIMER(_close_timer); |
| SCOPED_TIMER(_operator_profile->total_time_counter()); |
| |
| // will make the last batch of request-> close_wait will wait this finished. |
| _do_try_close(_state, exec_status); |
| TEST_INJECTION_POINT("VOlapTableSink::close"); |
| |
| DBUG_EXECUTE_IF("VTabletWriter.close.sleep", { |
| auto sleep_sec = DebugPoints::instance()->get_debug_param_or_default<int32_t>( |
| "VTabletWriter.close.sleep", "sleep_sec", 1); |
| std::this_thread::sleep_for(std::chrono::seconds(sleep_sec)); |
| }); |
| DBUG_EXECUTE_IF("VTabletWriter.close.close_status_not_ok", |
| { _close_status = Status::InternalError("injected close status not ok"); }); |
| |
| // If _close_status is not ok, all nodes have been canceled in try_close. |
| if (_close_status.ok()) { |
| auto status = Status::OK(); |
| // BE id -> add_batch method counter |
| std::unordered_map<int64_t, AddBatchCounter> node_add_batch_counter_map; |
| WriterStats writer_stats; |
| |
| for (const auto& index_channel : _channels) { |
| if (!status.ok()) { |
| break; |
| } |
| int64_t add_batch_exec_time = 0; |
| int64_t wait_exec_time = 0; |
| status = index_channel->close_wait(_state, &writer_stats, &node_add_batch_counter_map, |
| index_channel->each_node_channel_ids(), true); |
| |
| // Due to the non-determinism of compaction, the rowsets of each replica may be different from each other on different |
| // BE nodes. The number of rows filtered in SegmentWriter depends on the historical rowsets located in the correspoding |
| // BE node. So we check the number of rows filtered on each succeccful BE to ensure the consistency of the current load |
| if (status.ok() && !_write_single_replica && _schema->is_strict_mode() && |
| _schema->is_partial_update()) { |
| if (Status st = index_channel->check_tablet_filtered_rows_consistency(); !st.ok()) { |
| status = st; |
| } else { |
| _state->set_num_rows_filtered_in_strict_mode_partial_update( |
| index_channel->num_rows_filtered()); |
| } |
| } |
| |
| writer_stats.num_node_channels += index_channel->num_node_channels(); |
| writer_stats.max_add_batch_exec_time_ns = |
| std::max(add_batch_exec_time, writer_stats.max_add_batch_exec_time_ns); |
| writer_stats.max_wait_exec_time_ns = |
| std::max(wait_exec_time, writer_stats.max_wait_exec_time_ns); |
| } // end for index channels |
| |
| if (status.ok()) { |
| // TODO need to be improved |
| LOG(INFO) << "total mem_exceeded_block_ns=" |
| << writer_stats.channel_stat.mem_exceeded_block_ns |
| << ", total queue_push_lock_ns=" << writer_stats.queue_push_lock_ns |
| << ", total actual_consume_ns=" << writer_stats.actual_consume_ns |
| << ", load id=" << print_id(_load_id) << ", txn_id=" << _txn_id; |
| |
| COUNTER_SET(_input_rows_counter, _number_input_rows); |
| COUNTER_SET(_output_rows_counter, _number_output_rows); |
| COUNTER_SET(_filtered_rows_counter, |
| _block_convertor->num_filtered_rows() + |
| _tablet_finder->num_filtered_rows() + |
| _state->num_rows_filtered_in_strict_mode_partial_update()); |
| COUNTER_SET(_send_data_timer, _send_data_ns); |
| COUNTER_SET(_row_distribution_timer, (int64_t)_row_distribution_watch.elapsed_time()); |
| COUNTER_SET(_filter_timer, _filter_ns); |
| COUNTER_SET(_append_node_channel_timer, |
| writer_stats.channel_stat.append_node_channel_ns); |
| COUNTER_SET(_where_clause_timer, writer_stats.channel_stat.where_clause_ns); |
| COUNTER_SET(_wait_mem_limit_timer, writer_stats.channel_stat.mem_exceeded_block_ns); |
| COUNTER_SET(_validate_data_timer, _block_convertor->validate_data_ns()); |
| COUNTER_SET(_serialize_batch_timer, writer_stats.serialize_batch_ns); |
| COUNTER_SET(_non_blocking_send_work_timer, writer_stats.actual_consume_ns); |
| COUNTER_SET(_total_add_batch_exec_timer, writer_stats.total_add_batch_exec_time_ns); |
| COUNTER_SET(_max_add_batch_exec_timer, writer_stats.max_add_batch_exec_time_ns); |
| COUNTER_SET(_total_wait_exec_timer, writer_stats.total_wait_exec_time_ns); |
| COUNTER_SET(_max_wait_exec_timer, writer_stats.max_wait_exec_time_ns); |
| COUNTER_SET(_add_batch_number, writer_stats.total_add_batch_num); |
| COUNTER_SET(_num_node_channels, writer_stats.num_node_channels); |
| COUNTER_SET(_load_back_pressure_version_time_ms, |
| writer_stats.load_back_pressure_version_time_ms); |
| g_sink_load_back_pressure_version_time_ms |
| << writer_stats.load_back_pressure_version_time_ms; |
| |
| // _number_input_rows don't contain num_rows_load_filtered and num_rows_load_unselected in scan node |
| int64_t num_rows_load_total = _number_input_rows + _state->num_rows_load_filtered() + |
| _state->num_rows_load_unselected(); |
| _state->set_num_rows_load_total(num_rows_load_total); |
| _state->update_num_rows_load_filtered( |
| _block_convertor->num_filtered_rows() + _tablet_finder->num_filtered_rows() + |
| _state->num_rows_filtered_in_strict_mode_partial_update()); |
| _state->update_num_rows_load_unselected( |
| _tablet_finder->num_immutable_partition_filtered_rows()); |
| |
| if (_state->enable_profile() && _state->profile_level() >= 2) { |
| // Output detailed profiling info for auto-partition requests |
| _row_distribution.output_profile_info(_operator_profile); |
| } |
| |
| // print log of add batch time of all node, for tracing load performance easily |
| std::stringstream ss; |
| ss << "finished to close olap table sink. load_id=" << print_id(_load_id) |
| << ", txn_id=" << _txn_id |
| << ", node add batch time(ms)/wait execution time(ms)/close time(ms)/num: "; |
| for (auto const& pair : node_add_batch_counter_map) { |
| ss << "{" << pair.first << ":(" << (pair.second.add_batch_execution_time_us / 1000) |
| << ")(" << (pair.second.add_batch_wait_execution_time_us / 1000) << ")(" |
| << pair.second.close_wait_time_ms << ")(" << pair.second.add_batch_num << ")} "; |
| } |
| LOG(INFO) << ss.str(); |
| } else { |
| _cancel_all_channel(status); |
| } |
| _close_status = status; |
| } |
| |
| // Sender join() must put after node channels mark_close/cancel. |
| // But there is no specific sequence required between sender join() & close_wait(). |
| if (_sender_thread) { |
| bthread_join(_sender_thread, nullptr); |
| // We have to wait all task in _send_batch_thread_pool_token finished, |
| // because it is difficult to handle concurrent problem if we just |
| // shutdown it. |
| _send_batch_thread_pool_token->wait(); |
| } |
| |
| // We clear NodeChannels' batches here, cuz NodeChannels' batches destruction will use |
| // OlapTableSink::_mem_tracker and its parents. |
| // But their destructions are after OlapTableSink's. |
| for (const auto& index_channel : _channels) { |
| index_channel->for_each_node_channel( |
| [](const std::shared_ptr<VNodeChannel>& ch) { ch->clear_all_blocks(); }); |
| } |
| return _close_status; |
| } |
| |
| void VTabletWriter::_generate_one_index_channel_payload( |
| RowPartTabletIds& row_part_tablet_id, int32_t index_idx, |
| ChannelDistributionPayload& channel_payload) { |
| auto& row_ids = row_part_tablet_id.row_ids; |
| auto& tablet_ids = row_part_tablet_id.tablet_ids; |
| |
| size_t row_cnt = row_ids.size(); |
| |
| for (size_t i = 0; i < row_ids.size(); i++) { |
| // (tablet_id, VNodeChannel) where this tablet locate |
| auto it = _channels[index_idx]->_channels_by_tablet.find(tablet_ids[i]); |
| DCHECK(it != _channels[index_idx]->_channels_by_tablet.end()) |
| << "unknown tablet, tablet_id=" << tablet_ids[i]; |
| |
| std::vector<std::shared_ptr<VNodeChannel>>& tablet_locations = it->second; |
| for (const auto& locate_node : tablet_locations) { |
| auto payload_it = channel_payload.find(locate_node.get()); // <VNodeChannel*, Payload> |
| if (payload_it == channel_payload.end()) { |
| auto [tmp_it, _] = channel_payload.emplace( |
| locate_node.get(), |
| Payload {std::make_unique<IColumn::Selector>(), std::vector<int64_t>()}); |
| payload_it = tmp_it; |
| payload_it->second.first->reserve(row_cnt); |
| payload_it->second.second.reserve(row_cnt); |
| } |
| payload_it->second.first->push_back(row_ids[i]); |
| payload_it->second.second.push_back(tablet_ids[i]); |
| } |
| } |
| } |
| |
| void VTabletWriter::_generate_index_channels_payloads( |
| std::vector<RowPartTabletIds>& row_part_tablet_ids, |
| ChannelDistributionPayloadVec& payload) { |
| for (int i = 0; i < _schema->indexes().size(); i++) { |
| _generate_one_index_channel_payload(row_part_tablet_ids[i], i, payload[i]); |
| } |
| } |
| |
| Status VTabletWriter::write(RuntimeState* state, doris::Block& input_block) { |
| SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); |
| Status status = Status::OK(); |
| |
| DCHECK(_state); |
| DCHECK(_state->query_options().__isset.dry_run_query); |
| if (_state->query_options().dry_run_query) { |
| return status; |
| } |
| |
| // check out of limit |
| RETURN_IF_ERROR(_send_new_partition_batch()); |
| |
| auto rows = input_block.rows(); |
| auto bytes = input_block.bytes(); |
| if (UNLIKELY(rows == 0)) { |
| return status; |
| } |
| SCOPED_TIMER(_operator_profile->total_time_counter()); |
| SCOPED_RAW_TIMER(&_send_data_ns); |
| |
| std::shared_ptr<Block> block; |
| _number_input_rows += rows; |
| // update incrementally so that FE can get the progress. |
| // the real 'num_rows_load_total' will be set when sink being closed. |
| _state->update_num_rows_load_total(rows); |
| _state->update_num_bytes_load_total(bytes); |
| DorisMetrics::instance()->load_rows->increment(rows); |
| DorisMetrics::instance()->load_bytes->increment(bytes); |
| |
| _row_distribution_watch.start(); |
| RETURN_IF_ERROR(_row_distribution.generate_rows_distribution( |
| input_block, block, _row_part_tablet_ids, _number_input_rows)); |
| |
| ChannelDistributionPayloadVec channel_to_payload; |
| |
| channel_to_payload.resize(_channels.size()); |
| _generate_index_channels_payloads(_row_part_tablet_ids, channel_to_payload); |
| _row_distribution_watch.stop(); |
| |
| // Add block to node channel |
| for (size_t i = 0; i < _channels.size(); i++) { |
| for (const auto& entry : channel_to_payload[i]) { |
| // if this node channel is already failed, this add_row will be skipped |
| // entry.second is a [row -> tablet] mapping |
| auto st = entry.first->add_block(block.get(), &entry.second); |
| if (!st.ok()) { |
| _channels[i]->mark_as_failed(entry.first, st.to_string()); |
| } |
| } |
| } |
| |
| // check intolerable failure |
| for (const auto& index_channel : _channels) { |
| RETURN_IF_ERROR(index_channel->check_intolerable_failure()); |
| } |
| |
| g_sink_write_bytes << bytes; |
| g_sink_write_rows << rows; |
| return Status::OK(); |
| } |
| |
| } // namespace doris |