blob: acab5e12935ae3389709ef91862408ee89460103 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/tablets_channel.h"
#include <bvar/bvar.h>
#include <fmt/format.h>
#include <gen_cpp/internal_service.pb.h>
#include <gen_cpp/types.pb.h>
#include <ctime>
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/status.h"
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
#include <initializer_list>
#include <set>
#include <thread>
#include <utility>
#ifdef DEBUG
#include <unordered_set>
#endif
#include "common/logging.h"
#include "exec/tablet_info.h"
#include "olap/delta_writer.h"
#include "olap/storage_engine.h"
#include "olap/txn_manager.h"
#include "runtime/load_channel.h"
#include "util/defer_op.h"
#include "util/doris_metrics.h"
#include "util/metrics.h"
#include "vec/core/block.h"
namespace doris {
class SlotDescriptor;
bvar::Adder<int64_t> g_tablets_channel_send_data_allocated_size(
"tablets_channel_send_data_allocated_size");
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(tablet_writer_count, MetricUnit::NOUNIT);
std::atomic<uint64_t> BaseTabletsChannel::_s_tablet_writer_count;
BaseTabletsChannel::BaseTabletsChannel(const TabletsChannelKey& key, const UniqueId& load_id,
bool is_high_priority, RuntimeProfile* profile)
: _key(key),
_state(kInitialized),
_load_id(load_id),
_closed_senders(64),
_is_high_priority(is_high_priority) {
static std::once_flag once_flag;
_init_profile(profile);
std::call_once(once_flag, [] {
REGISTER_HOOK_METRIC(tablet_writer_count, [&]() { return _s_tablet_writer_count.load(); });
});
}
TabletsChannel::TabletsChannel(StorageEngine& engine, const TabletsChannelKey& key,
const UniqueId& load_id, bool is_high_priority,
RuntimeProfile* profile)
: BaseTabletsChannel(key, load_id, is_high_priority, profile), _engine(engine) {}
BaseTabletsChannel::~BaseTabletsChannel() {
_s_tablet_writer_count -= _tablet_writers.size();
}
TabletsChannel::~TabletsChannel() = default;
Status BaseTabletsChannel::_get_current_seq(int64_t& cur_seq,
const PTabletWriterAddBlockRequest& request) {
std::lock_guard<std::mutex> l(_lock);
if (_state != kOpened) {
return _state == kFinished ? _close_status
: Status::InternalError("TabletsChannel {} state: {}",
_key.to_string(), _state);
}
cur_seq = _next_seqs[request.sender_id()];
// check packet
if (request.packet_seq() > cur_seq) {
LOG(WARNING) << "lost data packet, expect_seq=" << cur_seq
<< ", recept_seq=" << request.packet_seq();
return Status::InternalError("lost data packet");
}
return Status::OK();
}
void BaseTabletsChannel::_init_profile(RuntimeProfile* profile) {
_profile =
profile->create_child(fmt::format("TabletsChannel {}", _key.to_string()), true, true);
_add_batch_number_counter = ADD_COUNTER(_profile, "NumberBatchAdded", TUnit::UNIT);
auto* memory_usage = _profile->create_child("PeakMemoryUsage", true, true);
_add_batch_timer = ADD_TIMER(_profile, "AddBatchTime");
_write_block_timer = ADD_TIMER(_profile, "WriteBlockTime");
_incremental_open_timer = ADD_TIMER(_profile, "IncrementalOpenTabletTime");
_memory_usage_counter = memory_usage->AddHighWaterMarkCounter("Total", TUnit::BYTES);
_write_memory_usage_counter = memory_usage->AddHighWaterMarkCounter("Write", TUnit::BYTES);
_flush_memory_usage_counter = memory_usage->AddHighWaterMarkCounter("Flush", TUnit::BYTES);
_max_tablet_memory_usage_counter =
memory_usage->AddHighWaterMarkCounter("MaxTablet", TUnit::BYTES);
_max_tablet_write_memory_usage_counter =
memory_usage->AddHighWaterMarkCounter("MaxTabletWrite", TUnit::BYTES);
_max_tablet_flush_memory_usage_counter =
memory_usage->AddHighWaterMarkCounter("MaxTabletFlush", TUnit::BYTES);
}
void TabletsChannel::_init_profile(RuntimeProfile* profile) {
BaseTabletsChannel::_init_profile(profile);
_slave_replica_timer = ADD_TIMER(_profile, "SlaveReplicaTime");
}
Status BaseTabletsChannel::open(const PTabletWriterOpenRequest& request) {
std::lock_guard<std::mutex> l(_lock);
// if _state is kOpened, it's a normal case, already open by other sender
// if _state is kFinished, already cancelled by other sender
if (_state == kOpened || _state == kFinished) {
return Status::OK();
}
_txn_id = request.txn_id();
_index_id = request.index_id();
_schema = std::make_shared<OlapTableSchemaParam>();
RETURN_IF_ERROR(_schema->init(request.schema()));
_tuple_desc = _schema->tuple_desc();
int max_sender = request.num_senders();
/*
* a tablets channel in reciever is related to a bulk of VNodeChannel of sender. each instance one or none.
* there are two possibilities:
* 1. there's partitions originally broadcasted by FE. so all sender(instance) know it at start. and open() will be
* called directly, not by incremental_open(). and after _state changes to kOpened. _open_by_incremental will never
* be true. in this case, _num_remaining_senders will keep same with senders number. when all sender sent close rpc,
* the tablets channel will close. and if for auto partition table, these channel's closing will hang on reciever and
* return together to avoid close-then-incremental-open problem.
* 2. this tablets channel is opened by incremental_open of sender's sink node. so only this sender will know this partition
* (this TabletsChannel) at that time. and we are not sure how many sender will know in the end. it depends on data
* distribution. in this situation open() is called by incremental_open() at first time. so _open_by_incremental is true.
* then _num_remaining_senders will not be set here. but inc every time when incremental_open() called. so it's dynamic
* and also need same number of senders' close to close. but will not hang.
*/
if (_open_by_incremental) {
DCHECK(_num_remaining_senders == 0) << _num_remaining_senders;
} else {
_num_remaining_senders = max_sender;
}
LOG(INFO) << fmt::format(
"open tablets channel {}, tablets num: {} timeout(s): {}, init senders {} with "
"incremental {}",
_key.to_string(), request.tablets().size(), request.load_channel_timeout_s(),
_num_remaining_senders, _open_by_incremental ? "on" : "off");
// just use max_sender no matter incremental or not cuz we dont know how many senders will open.
_next_seqs.resize(max_sender, 0);
_closed_senders.Reset(max_sender);
RETURN_IF_ERROR(_open_all_writers(request));
_state = kOpened;
return Status::OK();
}
Status BaseTabletsChannel::incremental_open(const PTabletWriterOpenRequest& params) {
SCOPED_TIMER(_incremental_open_timer);
// current node first opened by incremental open
if (_state == kInitialized) {
_open_by_incremental = true;
RETURN_IF_ERROR(open(params));
}
std::lock_guard<std::mutex> l(_lock);
// one sender may incremental_open many times. but only close one time. so dont count duplicately.
if (_open_by_incremental) {
if (params.has_sender_id() && !_recieved_senders.contains(params.sender_id())) {
_recieved_senders.insert(params.sender_id());
_num_remaining_senders++;
} else if (!params.has_sender_id()) { // for compatible
_num_remaining_senders++;
}
VLOG_DEBUG << fmt::format("txn {}: TabletsChannel {} inc senders to {}", _txn_id, _index_id,
_num_remaining_senders);
}
std::vector<SlotDescriptor*>* index_slots = nullptr;
int32_t schema_hash = 0;
for (const auto& index : _schema->indexes()) {
if (index->index_id == _index_id) {
index_slots = &index->slots;
schema_hash = index->schema_hash;
break;
}
}
if (index_slots == nullptr) {
return Status::InternalError("unknown index id, key={}", _key.to_string());
}
// update tablets
size_t incremental_tablet_num = 0;
std::stringstream ss;
ss << "LocalTabletsChannel txn_id: " << _txn_id << " load_id: " << print_id(params.id())
<< " incremental open delta writer: ";
// every change will hold _lock. this find in under _lock too. so no need _tablet_writers_lock again.
for (const auto& tablet : params.tablets()) {
if (_tablet_writers.find(tablet.tablet_id()) != _tablet_writers.end()) {
continue;
}
incremental_tablet_num++;
WriteRequest wrequest;
wrequest.index_id = params.index_id();
wrequest.tablet_id = tablet.tablet_id();
wrequest.schema_hash = schema_hash;
wrequest.txn_id = _txn_id;
wrequest.partition_id = tablet.partition_id();
wrequest.load_id = params.id();
wrequest.tuple_desc = _tuple_desc;
wrequest.slots = index_slots;
wrequest.is_high_priority = _is_high_priority;
wrequest.table_schema_param = _schema;
wrequest.txn_expiration = params.txn_expiration(); // Required by CLOUD.
wrequest.storage_vault_id = params.storage_vault_id();
auto delta_writer = create_delta_writer(wrequest);
{
// here we modify _tablet_writers. so need lock.
std::lock_guard<std::mutex> l(_tablet_writers_lock);
_tablet_writers.emplace(tablet.tablet_id(), std::move(delta_writer));
}
ss << "[" << tablet.tablet_id() << "]";
}
_s_tablet_writer_count += incremental_tablet_num;
LOG(INFO) << ss.str();
_state = kOpened;
return Status::OK();
}
std::unique_ptr<BaseDeltaWriter> TabletsChannel::create_delta_writer(const WriteRequest& request) {
return std::make_unique<DeltaWriter>(_engine, request, _profile, _load_id);
}
Status TabletsChannel::close(LoadChannel* parent, const PTabletWriterAddBlockRequest& req,
PTabletWriterAddBlockResult* res, bool* finished) {
int sender_id = req.sender_id();
int64_t backend_id = req.backend_id();
const auto& partition_ids = req.partition_ids();
auto* tablet_errors = res->mutable_tablet_errors();
std::lock_guard<std::mutex> l(_lock);
if (_state == kFinished) {
return _close_status;
}
if (_closed_senders.Get(sender_id)) {
// Double close from one sender, just return OK
*finished = (_num_remaining_senders == 0);
return _close_status;
}
for (auto pid : partition_ids) {
_partition_ids.emplace(pid);
}
_closed_senders.Set(sender_id, true);
_num_remaining_senders--;
*finished = (_num_remaining_senders == 0);
LOG(INFO) << fmt::format(
"txn {}: close tablets channel of index {} , sender id: {}, backend {}, remain "
"senders: {}",
_txn_id, _index_id, sender_id, backend_id, _num_remaining_senders);
if (!*finished) {
return Status::OK();
}
_state = kFinished;
// All senders are closed
// 1. close all delta writers
std::set<DeltaWriter*> need_wait_writers;
// under _lock. no need _tablet_writers_lock again.
for (auto&& [tablet_id, writer] : _tablet_writers) {
if (_partition_ids.contains(writer->partition_id())) {
auto st = writer->close();
if (!st.ok()) {
auto err_msg = fmt::format(
"close tablet writer failed, tablet_id={}, "
"transaction_id={}, err={}",
tablet_id, _txn_id, st.to_string());
LOG(WARNING) << err_msg;
PTabletError* tablet_error = tablet_errors->Add();
tablet_error->set_tablet_id(tablet_id);
tablet_error->set_msg(st.to_string());
// just skip this tablet(writer) and continue to close others
continue;
}
// tablet writer in `_broken_tablets` should not call `build_rowset` and
// `commit_txn` method, after that, the publish-version task will success,
// which can cause the replica inconsistency.
if (_is_broken_tablet(writer->tablet_id())) {
LOG(WARNING) << "SHOULD NOT HAPPEN, tablet writer is broken but not cancelled"
<< ", tablet_id=" << tablet_id << ", transaction_id=" << _txn_id;
continue;
}
need_wait_writers.insert(static_cast<DeltaWriter*>(writer.get()));
} else {
auto st = writer->cancel();
if (!st.ok()) {
LOG(WARNING) << "cancel tablet writer failed, tablet_id=" << tablet_id
<< ", transaction_id=" << _txn_id;
// just skip this tablet(writer) and continue to close others
continue;
}
VLOG_PROGRESS << "cancel tablet writer successfully, tablet_id=" << tablet_id
<< ", transaction_id=" << _txn_id;
}
}
_write_single_replica = req.write_single_replica();
// 2. wait all writer finished flush.
for (auto* writer : need_wait_writers) {
RETURN_IF_ERROR((writer->wait_flush()));
}
// 3. build rowset
for (auto it = need_wait_writers.begin(); it != need_wait_writers.end();) {
Status st = (*it)->build_rowset();
if (!st.ok()) {
_add_error_tablet(tablet_errors, (*it)->tablet_id(), st);
it = need_wait_writers.erase(it);
continue;
}
// 3.1 calculate delete bitmap for Unique Key MoW tables
st = (*it)->submit_calc_delete_bitmap_task();
if (!st.ok()) {
_add_error_tablet(tablet_errors, (*it)->tablet_id(), st);
it = need_wait_writers.erase(it);
continue;
}
it++;
}
// 4. wait for delete bitmap calculation complete if necessary
for (auto it = need_wait_writers.begin(); it != need_wait_writers.end();) {
Status st = (*it)->wait_calc_delete_bitmap();
if (!st.ok()) {
_add_error_tablet(tablet_errors, (*it)->tablet_id(), st);
it = need_wait_writers.erase(it);
continue;
}
it++;
}
// 5. commit all writers
for (auto* writer : need_wait_writers) {
// close may return failed, but no need to handle it here.
// tablet_vec will only contains success tablet, and then let FE judge it.
_commit_txn(writer, req, res);
}
if (_write_single_replica) {
auto* success_slave_tablet_node_ids = res->mutable_success_slave_tablet_node_ids();
// The operation waiting for all slave replicas to complete must end before the timeout,
// so that there is enough time to collect completed replica. Otherwise, the task may
// timeout and fail even though most of the replicas are completed. Here we set 0.9
// times the timeout as the maximum waiting time.
SCOPED_TIMER(_slave_replica_timer);
while (!need_wait_writers.empty() &&
(time(nullptr) - parent->last_updated_time()) < (parent->timeout() * 0.9)) {
std::set<DeltaWriter*>::iterator it;
for (it = need_wait_writers.begin(); it != need_wait_writers.end();) {
bool is_done = (*it)->check_slave_replicas_done(success_slave_tablet_node_ids);
if (is_done) {
need_wait_writers.erase(it++);
} else {
it++;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
for (auto* writer : need_wait_writers) {
writer->add_finished_slave_replicas(success_slave_tablet_node_ids);
}
_engine.txn_manager()->clear_txn_tablet_delta_writer(_txn_id);
}
return Status::OK();
}
void TabletsChannel::_commit_txn(DeltaWriter* writer, const PTabletWriterAddBlockRequest& req,
PTabletWriterAddBlockResult* res) {
PSlaveTabletNodes slave_nodes;
if (_write_single_replica) {
auto& nodes_map = req.slave_tablet_nodes();
auto it = nodes_map.find(writer->tablet_id());
if (it != nodes_map.end()) {
slave_nodes = it->second;
}
}
Status st = writer->commit_txn(slave_nodes);
if (st.ok()) [[likely]] {
auto* tablet_vec = res->mutable_tablet_vec();
PTabletInfo* tablet_info = tablet_vec->Add();
tablet_info->set_tablet_id(writer->tablet_id());
// unused required field.
tablet_info->set_schema_hash(0);
tablet_info->set_received_rows(writer->total_received_rows());
tablet_info->set_num_rows_filtered(writer->num_rows_filtered());
_total_received_rows += writer->total_received_rows();
_num_rows_filtered += writer->num_rows_filtered();
} else {
_add_error_tablet(res->mutable_tablet_errors(), writer->tablet_id(), st);
}
}
void BaseTabletsChannel::_add_error_tablet(
google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors, int64_t tablet_id,
Status error) const {
PTabletError* tablet_error = tablet_errors->Add();
tablet_error->set_tablet_id(tablet_id);
tablet_error->set_msg(error.to_string());
VLOG_PROGRESS << "close wait failed tablet " << tablet_id << " transaction_id " << _txn_id
<< "err msg " << error;
}
void BaseTabletsChannel::refresh_profile() {
int64_t write_mem_usage = 0;
int64_t flush_mem_usage = 0;
int64_t max_tablet_mem_usage = 0;
int64_t max_tablet_write_mem_usage = 0;
int64_t max_tablet_flush_mem_usage = 0;
{
std::lock_guard<std::mutex> l(_tablet_writers_lock);
for (auto&& [tablet_id, writer] : _tablet_writers) {
int64_t write_mem = writer->mem_consumption(MemType::WRITE_FINISHED);
write_mem_usage += write_mem;
int64_t flush_mem = writer->mem_consumption(MemType::FLUSH);
flush_mem_usage += flush_mem;
if (write_mem > max_tablet_write_mem_usage) {
max_tablet_write_mem_usage = write_mem;
}
if (flush_mem > max_tablet_flush_mem_usage) {
max_tablet_flush_mem_usage = flush_mem;
}
if (write_mem + flush_mem > max_tablet_mem_usage) {
max_tablet_mem_usage = write_mem + flush_mem;
}
}
}
COUNTER_SET(_memory_usage_counter, write_mem_usage + flush_mem_usage);
COUNTER_SET(_write_memory_usage_counter, write_mem_usage);
COUNTER_SET(_flush_memory_usage_counter, flush_mem_usage);
COUNTER_SET(_max_tablet_memory_usage_counter, max_tablet_mem_usage);
COUNTER_SET(_max_tablet_write_memory_usage_counter, max_tablet_write_mem_usage);
COUNTER_SET(_max_tablet_flush_memory_usage_counter, max_tablet_flush_mem_usage);
}
Status BaseTabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request) {
std::vector<SlotDescriptor*>* index_slots = nullptr;
int32_t schema_hash = 0;
for (const auto& index : _schema->indexes()) {
if (index->index_id == _index_id) {
index_slots = &index->slots;
schema_hash = index->schema_hash;
break;
}
}
if (index_slots == nullptr) {
return Status::InternalError("unknown index id, key={}", _key.to_string());
}
#ifdef DEBUG
// check: tablet ids should be unique
{
std::unordered_set<int64_t> tablet_ids;
for (const auto& tablet : request.tablets()) {
CHECK(tablet_ids.count(tablet.tablet_id()) == 0)
<< "found duplicate tablet id: " << tablet.tablet_id();
tablet_ids.insert(tablet.tablet_id());
}
}
#endif
int tablet_cnt = 0;
// under _lock. no need _tablet_writers_lock again.
for (const auto& tablet : request.tablets()) {
if (_tablet_writers.find(tablet.tablet_id()) != _tablet_writers.end()) {
continue;
}
tablet_cnt++;
WriteRequest wrequest {
.tablet_id = tablet.tablet_id(),
.schema_hash = schema_hash,
.txn_id = _txn_id,
.txn_expiration = request.txn_expiration(), // Required by CLOUD.
.index_id = request.index_id(),
.partition_id = tablet.partition_id(),
.load_id = request.id(),
.tuple_desc = _tuple_desc,
.slots = index_slots,
.table_schema_param = _schema,
.is_high_priority = _is_high_priority,
.write_file_cache = request.write_file_cache(),
.storage_vault_id = request.storage_vault_id(),
};
auto delta_writer = create_delta_writer(wrequest);
{
std::lock_guard<std::mutex> l(_tablet_writers_lock);
_tablet_writers.emplace(tablet.tablet_id(), std::move(delta_writer));
}
}
_s_tablet_writer_count += _tablet_writers.size();
DCHECK_EQ(_tablet_writers.size(), tablet_cnt);
return Status::OK();
}
Status BaseTabletsChannel::cancel() {
std::lock_guard<std::mutex> l(_lock);
if (_state == kFinished) {
return _close_status;
}
for (auto& it : _tablet_writers) {
static_cast<void>(it.second->cancel());
}
_state = kFinished;
return Status::OK();
}
Status TabletsChannel::cancel() {
RETURN_IF_ERROR(BaseTabletsChannel::cancel());
if (_write_single_replica) {
_engine.txn_manager()->clear_txn_tablet_delta_writer(_txn_id);
}
return Status::OK();
}
std::string TabletsChannelKey::to_string() const {
std::stringstream ss;
ss << *this;
return ss.str();
}
std::ostream& operator<<(std::ostream& os, const TabletsChannelKey& key) {
os << "(load_id=" << key.id << ", index_id=" << key.index_id << ")";
return os;
}
Status BaseTabletsChannel::_write_block_data(
const PTabletWriterAddBlockRequest& request, int64_t cur_seq,
std::unordered_map<int64_t, DorisVector<uint32_t>>& tablet_to_rowidxs,
PTabletWriterAddBlockResult* response) {
vectorized::Block send_data;
RETURN_IF_ERROR(send_data.deserialize(request.block()));
CHECK(send_data.rows() == request.tablet_ids_size())
<< "block rows: " << send_data.rows()
<< ", tablet_ids_size: " << request.tablet_ids_size();
g_tablets_channel_send_data_allocated_size << send_data.allocated_bytes();
Defer defer {
[&]() { g_tablets_channel_send_data_allocated_size << -send_data.allocated_bytes(); }};
auto write_tablet_data = [&](int64_t tablet_id,
std::function<Status(BaseDeltaWriter * writer)> write_func) {
google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors =
response->mutable_tablet_errors();
// add_batch may concurrency with inc_open but not under _lock.
// so need to protect it with _tablet_writers_lock.
decltype(_tablet_writers.find(tablet_id)) tablet_writer_it;
{
std::lock_guard<std::mutex> l(_tablet_writers_lock);
tablet_writer_it = _tablet_writers.find(tablet_id);
if (tablet_writer_it == _tablet_writers.end()) {
return Status::InternalError("unknown tablet to append data, tablet={}", tablet_id);
}
}
Status st = write_func(tablet_writer_it->second.get());
if (!st.ok()) {
auto err_msg =
fmt::format("tablet writer write failed, tablet_id={}, txn_id={}, err={}",
tablet_id, _txn_id, st.to_string());
LOG(WARNING) << err_msg;
PTabletError* error = tablet_errors->Add();
error->set_tablet_id(tablet_id);
error->set_msg(err_msg);
static_cast<void>(tablet_writer_it->second->cancel_with_status(st));
_add_broken_tablet(tablet_id);
// continue write to other tablet.
// the error will return back to sender.
}
return Status::OK();
};
SCOPED_TIMER(_write_block_timer);
for (const auto& tablet_to_rowidxs_it : tablet_to_rowidxs) {
RETURN_IF_ERROR(write_tablet_data(tablet_to_rowidxs_it.first, [&](BaseDeltaWriter* writer) {
return writer->write(&send_data, tablet_to_rowidxs_it.second);
}));
}
{
std::lock_guard<std::mutex> l(_lock);
_next_seqs[request.sender_id()] = cur_seq + 1;
}
return Status::OK();
}
Status TabletsChannel::add_batch(const PTabletWriterAddBlockRequest& request,
PTabletWriterAddBlockResult* response) {
SCOPED_TIMER(_add_batch_timer);
int64_t cur_seq = 0;
_add_batch_number_counter->update(1);
auto status = _get_current_seq(cur_seq, request);
if (UNLIKELY(!status.ok())) {
return status;
}
if (request.packet_seq() < cur_seq) {
LOG(INFO) << "packet has already recept before, expect_seq=" << cur_seq
<< ", recept_seq=" << request.packet_seq();
return Status::OK();
}
std::unordered_map<int64_t /* tablet_id */, DorisVector<uint32_t> /* row index */>
tablet_to_rowidxs;
_build_tablet_to_rowidxs(request, &tablet_to_rowidxs);
return _write_block_data(request, cur_seq, tablet_to_rowidxs, response);
}
void BaseTabletsChannel::_add_broken_tablet(int64_t tablet_id) {
std::unique_lock<std::shared_mutex> wlock(_broken_tablets_lock);
_broken_tablets.insert(tablet_id);
}
bool BaseTabletsChannel::_is_broken_tablet(int64_t tablet_id) const {
return _broken_tablets.find(tablet_id) != _broken_tablets.end();
}
void BaseTabletsChannel::_build_tablet_to_rowidxs(
const PTabletWriterAddBlockRequest& request,
std::unordered_map<int64_t, DorisVector<uint32_t>>* tablet_to_rowidxs) {
// just add a coarse-grained read lock here rather than each time when visiting _broken_tablets
// tests show that a relatively coarse-grained read lock here performs better under multicore scenario
// see: https://github.com/apache/doris/pull/28552
std::shared_lock<std::shared_mutex> rlock(_broken_tablets_lock);
if (request.is_single_tablet_block()) {
// The cloud mode need the tablet ids to prepare rowsets.
int64_t tablet_id = request.tablet_ids(0);
tablet_to_rowidxs->emplace(tablet_id, std::initializer_list<uint32_t> {0});
return;
}
for (uint32_t i = 0; i < request.tablet_ids_size(); ++i) {
int64_t tablet_id = request.tablet_ids(i);
if (_is_broken_tablet(tablet_id)) {
// skip broken tablets
VLOG_PROGRESS << "skip broken tablet tablet=" << tablet_id;
continue;
}
auto it = tablet_to_rowidxs->find(tablet_id);
if (it == tablet_to_rowidxs->end()) {
tablet_to_rowidxs->emplace(tablet_id, std::initializer_list<uint32_t> {i});
} else {
it->second.emplace_back(i);
}
}
}
} // namespace doris