blob: 5f9b3847d5f460aff871fcf0b3b8fc07c5725085 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <gen_cpp/cloud.pb.h>
#include <gen_cpp/olap_file.pb.h>
#include <chrono>
#include <cstdint>
#include <limits>
#include <ranges>
#include <tuple>
#include "common/config.h"
#include "common/logging.h"
#include "common/stats.h"
#include "cpp/sync_point.h"
#include "meta-service/doris_txn.h"
#include "meta-service/meta_service.h"
#include "meta-service/meta_service_helper.h"
#include "meta-service/meta_service_tablet_stats.h"
#include "meta-store/blob_message.h"
#include "meta-store/clone_chain_reader.h"
#include "meta-store/document_message.h"
#include "meta-store/keys.h"
#include "meta-store/meta_reader.h"
#include "meta-store/txn_kv.h"
#include "meta-store/txn_kv_error.h"
#include "meta-store/versioned_value.h"
using namespace std::chrono;
namespace doris::cloud {
struct TableStats {
int64_t updated_row_count = 0;
TableStats() = default;
TableStats(int64_t num_rows) : updated_row_count(num_rows) {}
std::string to_string() const {
std::stringstream ss;
ss << "updated_row_count: " << updated_row_count;
return ss.str();
}
};
template <typename Container, typename Range>
static Container to_container(Range&& r) {
return Container(r.begin(), r.end());
}
static void get_pb_from_tablestats(TableStats& stats, TableStatsPB* stats_pb) {
stats_pb->set_updated_row_count(stats.updated_row_count);
}
static void calc_table_stats(std::unordered_map<int64_t, TabletIndexPB>& tablet_ids,
std::unordered_map<int64_t, TabletStats>& tablet_stats,
std::map<int64_t, TableStats>& table_stats,
std::vector<int64_t> base_tablet_ids) {
int64_t table_id;
VLOG_DEBUG << "base_tablet_ids size: " << base_tablet_ids.size();
for (int64_t tablet_id : base_tablet_ids) {
auto it = tablet_stats.find(tablet_id);
if (it == tablet_stats.end()) {
continue;
}
const auto& tablet_stat = it->second;
table_id = tablet_ids[tablet_id].table_id();
if (table_stats.find(table_id) == table_stats.end()) {
table_stats[table_id] = TableStats(tablet_stat.num_rows);
} else {
table_stats[table_id].updated_row_count += tablet_stat.num_rows;
}
}
}
//TODO: we need move begin/commit etc txn to TxnManager
void MetaServiceImpl::begin_txn(::google::protobuf::RpcController* controller,
const BeginTxnRequest* request, BeginTxnResponse* response,
::google::protobuf::Closure* done) {
RPC_PREPROCESS(begin_txn, get, put);
if (!request->has_txn_info()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "invalid argument, missing txn info";
return;
}
auto& txn_info = const_cast<TxnInfoPB&>(request->txn_info());
std::string label = txn_info.has_label() ? txn_info.label() : "";
int64_t db_id = txn_info.has_db_id() ? txn_info.db_id() : -1;
if (label.empty() || db_id < 0 || txn_info.table_ids().empty() || !txn_info.has_timeout_ms()) {
code = MetaServiceCode::INVALID_ARGUMENT;
ss << "invalid argument, label=" << label << " db_id=" << db_id;
msg = ss.str();
return;
}
std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
ss << "cannot find instance_id with cloud_unique_id="
<< (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id) << " label=" << label;
msg = ss.str();
return;
}
RPC_RATE_LIMIT(begin_txn)
//1. Generate version stamp for txn id
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
ss << "txn_kv_->create_txn() failed, err=" << err << " label=" << label
<< " db_id=" << db_id;
msg = ss.str();
return;
}
const std::string label_key = txn_label_key({instance_id, db_id, label});
std::string label_val;
err = txn->get(label_key, &label_val);
if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
code = cast_as<ErrCategory::READ>(err);
ss << "txn->get failed(), err=" << err << " label=" << label;
msg = ss.str();
return;
}
LOG(INFO) << "txn->get label_key=" << hex(label_key) << " label=" << label << " err=" << err;
// err == OK means label has previous txn ids.
if (err == TxnErrorCode::TXN_OK) {
label_val = label_val.substr(0, label_val.size() - VERSION_STAMP_LEN);
}
//ret > 0, means label not exist previously.
txn->atomic_set_ver_value(label_key, label_val);
LOG(INFO) << "txn->atomic_set_ver_value label_key=" << hex(label_key);
TEST_SYNC_POINT_CALLBACK("begin_txn:before:commit_txn:1", &label);
err = txn->commit();
TEST_SYNC_POINT_CALLBACK("begin_txn:after:commit_txn:1", &label);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::COMMIT>(err);
ss << "txn->commit failed(), label=" << label << " err=" << err;
msg = ss.str();
return;
}
// get count before txn reset, if not we will lose these count
stats.get_bytes += txn->get_bytes();
stats.put_bytes += txn->put_bytes();
stats.get_counter += txn->num_get_keys();
stats.put_counter += txn->num_put_keys();
//2. Get txn id from version stamp
txn.reset();
err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
ss << "failed to create txn when get txn id, label=" << label << " err=" << err;
msg = ss.str();
return;
}
label_val.clear();
err = txn->get(label_key, &label_val);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
ss << "txn->get() failed, label=" << label << " err=" << err;
msg = ss.str();
return;
}
LOG(INFO) << "txn->get label_key=" << hex(label_key) << " label=" << label << " err=" << err;
// Generated by TxnKv system
int64_t txn_id = 0;
int ret =
get_txn_id_from_fdb_ts(std::string_view(label_val).substr(
label_val.size() - VERSION_STAMP_LEN, label_val.size()),
&txn_id);
if (ret != 0) {
code = MetaServiceCode::TXN_GEN_ID_ERR;
ss << "get_txn_id_from_fdb_ts() failed, label=" << label << " ret=" << ret;
msg = ss.str();
return;
}
LOG(INFO) << "get_txn_id_from_fdb_ts() label=" << label << " txn_id=" << txn_id
<< " label_val.size()=" << label_val.size();
TxnLabelPB label_pb;
if (label_val.size() > VERSION_STAMP_LEN) {
//3. Check label
//label_val.size() > VERSION_STAMP_LEN means label has previous txn ids.
if (!label_pb.ParseFromArray(label_val.data(), label_val.size() - VERSION_STAMP_LEN)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "label_pb->ParseFromString() failed, txn_id=" << txn_id << " label=" << label;
msg = ss.str();
return;
}
// Check if label already used, by following steps
// 1. get all existing transactions
// 2. if there is a PREPARE transaction, check if this is a retry request.
// 3. if there is a non-aborted transaction, throw label already used exception.
for (auto it = label_pb.txn_ids().rbegin(); it != label_pb.txn_ids().rend(); ++it) {
int64_t cur_txn_id = *it;
const std::string cur_info_key = txn_info_key({instance_id, db_id, cur_txn_id});
std::string cur_info_val;
err = txn->get(cur_info_key, &cur_info_val);
if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
code = cast_as<ErrCategory::READ>(err);
ss << "txn->get() failed, cur_txn_id=" << cur_txn_id << " label=" << label
<< " err=" << err;
msg = ss.str();
return;
}
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
//label_to_idx and txn info inconsistency.
code = MetaServiceCode::TXN_ID_NOT_FOUND;
ss << "txn->get() failed, cur_txn_id=" << cur_txn_id << " label=" << label
<< " err=" << err;
msg = ss.str();
return;
}
TxnInfoPB cur_txn_info;
if (!cur_txn_info.ParseFromString(cur_info_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "cur_txn_info->ParseFromString() failed, cur_txn_id=" << cur_txn_id
<< " label=" << label << " err=" << err;
msg = ss.str();
return;
}
VLOG_DEBUG << "cur_txn_info=" << cur_txn_info.ShortDebugString();
LOG(INFO) << " size=" << label_pb.txn_ids().size()
<< " status=" << cur_txn_info.status() << " txn_id=" << txn_id
<< " label=" << label;
if (cur_txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) {
if (label_pb.txn_ids().size() >= config::max_num_aborted_txn) {
code = MetaServiceCode::INVALID_ARGUMENT;
ss << "too many aborted txn for label=" << label << " txn_id=" << txn_id
<< ", please check your data quality";
msg = ss.str();
LOG(WARNING) << msg << " label_pb=" << label_pb.ShortDebugString();
return;
}
break;
}
if (cur_txn_info.status() == TxnStatusPB::TXN_STATUS_PREPARED ||
cur_txn_info.status() == TxnStatusPB::TXN_STATUS_PRECOMMITTED) {
// clang-format off
if (cur_txn_info.has_request_id() && txn_info.has_request_id() &&
((cur_txn_info.request_id().hi() == txn_info.request_id().hi()) &&
(cur_txn_info.request_id().lo() == txn_info.request_id().lo()))) {
response->set_dup_txn_id(cur_txn_info.txn_id());
code = MetaServiceCode::TXN_DUPLICATED_REQ;
ss << "db_id=" << db_id << " label=" << label << " txn_id=" << cur_txn_info.txn_id() << " dup begin txn request.";
msg = ss.str();
return;
}
// clang-format on
}
response->set_txn_status(cur_txn_info.status());
code = MetaServiceCode::TXN_LABEL_ALREADY_USED;
ss << "Label [" << label << "] has already been used, relate to txn ["
<< cur_txn_info.txn_id() << "], status=[" << TxnStatusPB_Name(cur_txn_info.status())
<< "]";
msg = ss.str();
return;
}
}
// Update txn_info to be put into TxnKv
// Update txn_id in PB
txn_info.set_txn_id(txn_id);
// TODO:
// check initial status must be TXN_STATUS_PREPARED or TXN_STATUS_UNKNOWN
txn_info.set_status(TxnStatusPB::TXN_STATUS_PREPARED);
auto now_time = system_clock::now();
uint64_t prepare_time = duration_cast<milliseconds>(now_time.time_since_epoch()).count();
txn_info.set_prepare_time(prepare_time);
//4. put txn info and db_tbl
const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
std::string info_val;
if (!txn_info.SerializeToString(&info_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize txn_info, label=" << label << " txn_id=" << txn_id;
msg = ss.str();
return;
}
const std::string index_key = txn_index_key({instance_id, txn_id});
std::string index_val;
TxnIndexPB index_pb;
index_pb.mutable_tablet_index()->set_db_id(db_id);
if (!index_pb.SerializeToString(&index_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize txn_index_pb "
<< "label=" << label << " txn_id=" << txn_id;
msg = ss.str();
return;
}
const std::string running_key = txn_running_key({instance_id, db_id, txn_id});
std::string running_val;
TxnRunningPB running_pb;
running_pb.set_timeout_time(prepare_time + txn_info.timeout_ms());
running_pb.mutable_table_ids()->CopyFrom(txn_info.table_ids());
VLOG_DEBUG << "label=" << label << " txn_id=" << txn_id
<< "running_pb=" << running_pb.ShortDebugString();
if (!running_pb.SerializeToString(&running_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize running_pb label=" << label << " txn_id=" << txn_id;
msg = ss.str();
return;
}
label_pb.add_txn_ids(txn_id);
VLOG_DEBUG << "label=" << label << " txn_id=" << txn_id
<< "txn_label_pb=" << label_pb.ShortDebugString();
if (!label_pb.SerializeToString(&label_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize txn_label_pb label=" << label << " txn_id=" << txn_id;
msg = ss.str();
return;
}
txn->atomic_set_ver_value(label_key, label_val);
LOG(INFO) << "txn->atomic_set_ver_value label_key=" << hex(label_key) << " label=" << label
<< " txn_id=" << txn_id;
txn->put(info_key, info_val);
txn->put(index_key, index_val);
txn->put(running_key, running_val);
LOG(INFO) << "xxx put info_key=" << hex(info_key) << " txn_id=" << txn_id;
LOG(INFO) << "xxx put running_key=" << hex(running_key) << " txn_id=" << txn_id;
LOG(INFO) << "xxx put index_key=" << hex(index_key) << " txn_id=" << txn_id;
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::COMMIT>(err);
ss << "failed to commit txn kv, label=" << label << " txn_id=" << txn_id << " err=" << err;
msg = ss.str();
return;
}
TEST_SYNC_POINT_CALLBACK("begin_txn:after:commit_txn:2", &txn_id);
response->set_txn_id(txn_id);
}
void MetaServiceImpl::precommit_txn(::google::protobuf::RpcController* controller,
const PrecommitTxnRequest* request,
PrecommitTxnResponse* response,
::google::protobuf::Closure* done) {
RPC_PREPROCESS(precommit_txn, get, put);
int64_t txn_id = request->has_txn_id() ? request->txn_id() : -1;
int64_t db_id = request->has_db_id() ? request->db_id() : -1;
if ((txn_id < 0 && db_id < 0)) {
code = MetaServiceCode::INVALID_ARGUMENT;
ss << "invalid argument, "
<< "txn_id=" << txn_id << " db_id=" << db_id;
msg = ss.str();
return;
}
std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
ss << "cannot find instance_id with cloud_unique_id="
<< (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id) << " txn_id=" << txn_id;
msg = ss.str();
return;
}
RPC_RATE_LIMIT(precommit_txn);
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
ss << "txn_kv_->create_txn() failed, err=" << err << " txn_id=" << txn_id;
msg = ss.str();
return;
}
//not provide db_id, we need read from disk.
if (db_id < 0) {
const std::string index_key = txn_index_key({instance_id, txn_id});
std::string index_val;
err = txn->get(index_key, &index_val);
if (err != TxnErrorCode::TXN_OK) {
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND
: cast_as<ErrCategory::READ>(err);
ss << "failed to get db id with txn_id=" << txn_id << " err=" << err;
msg = ss.str();
return;
}
TxnIndexPB index_pb;
if (!index_pb.ParseFromString(index_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "failed to parse txn_info"
<< " txn_id=" << txn_id;
msg = ss.str();
return;
}
DCHECK(index_pb.has_tablet_index() == true);
DCHECK(index_pb.tablet_index().has_db_id() == true);
db_id = index_pb.tablet_index().db_id();
VLOG_DEBUG << " find db_id=" << db_id << " from index";
} else {
db_id = request->db_id();
}
// Get txn info with db_id and txn_id
const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
std::string info_val; // Will be reused when saving updated txn
err = txn->get(info_key, &info_val);
if (err != TxnErrorCode::TXN_OK) {
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND
: cast_as<ErrCategory::READ>(err);
ss << "failed to get db id with db_id=" << db_id << " txn_id=" << txn_id << " err=" << err;
msg = ss.str();
return;
}
TxnInfoPB txn_info;
if (!txn_info.ParseFromString(info_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "failed to parse txn_inf db_id=" << db_id << " txn_id=" << txn_id;
msg = ss.str();
return;
}
DCHECK(txn_info.txn_id() == txn_id);
if (txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) {
code = MetaServiceCode::TXN_ALREADY_ABORTED;
ss << "transaction is already aborted: db_id=" << db_id << " txn_id=" << txn_id;
msg = ss.str();
return;
}
if (txn_info.status() == TxnStatusPB::TXN_STATUS_VISIBLE) {
code = MetaServiceCode::TXN_ALREADY_VISIBLE;
ss << "transaction is already visible: db_id=" << db_id << " txn_id=" << txn_id;
msg = ss.str();
}
if (txn_info.status() == TxnStatusPB::TXN_STATUS_PRECOMMITTED) {
code = MetaServiceCode::TXN_ALREADY_PRECOMMITED;
ss << "transaction is already precommited: db_id=" << db_id << " txn_id=" << txn_id;
msg = ss.str();
}
LOG(INFO) << "before update txn_info=" << txn_info.ShortDebugString();
// Update txn_info
txn_info.set_status(TxnStatusPB::TXN_STATUS_PRECOMMITTED);
auto now_time = system_clock::now();
uint64_t precommit_time = duration_cast<milliseconds>(now_time.time_since_epoch()).count();
txn_info.set_precommit_time(precommit_time);
if (request->has_commit_attachment()) {
txn_info.mutable_commit_attachment()->CopyFrom(request->commit_attachment());
}
LOG(INFO) << "after update txn_info=" << txn_info.ShortDebugString();
info_val.clear();
if (!txn_info.SerializeToString(&info_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize txn_info when saving, txn_id=" << txn_id;
msg = ss.str();
return;
}
txn->put(info_key, info_val);
LOG(INFO) << "xxx put info_key=" << hex(info_key) << " txn_id=" << txn_id;
const std::string running_key = txn_running_key({instance_id, db_id, txn_id});
std::string running_val;
TxnRunningPB running_pb;
running_pb.set_timeout_time(precommit_time + txn_info.precommit_timeout_ms());
running_pb.mutable_table_ids()->CopyFrom(txn_info.table_ids());
if (!running_pb.SerializeToString(&running_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize running_pb, txn_id=" << txn_id;
msg = ss.str();
return;
}
txn->put(running_key, running_val);
LOG(INFO) << "xxx put running_key=" << hex(running_key) << " txn_id=" << txn_id;
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::COMMIT>(err);
ss << "failed to commit txn kv, txn_id=" << txn_id << " err=" << err;
msg = ss.str();
return;
}
}
void put_routine_load_progress(MetaServiceCode& code, std::string& msg,
const std::string& instance_id, const CommitTxnRequest* request,
Transaction* txn, int64_t db_id) {
std::stringstream ss;
int64_t txn_id = request->txn_id();
if (!request->has_commit_attachment()) {
ss << "failed to get commit attachment from req, db_id=" << db_id << " txn_id=" << txn_id;
msg = ss.str();
return;
}
TxnCommitAttachmentPB txn_commit_attachment = request->commit_attachment();
RLTaskTxnCommitAttachmentPB commit_attachment =
txn_commit_attachment.rl_task_txn_commit_attachment();
int64_t job_id = commit_attachment.job_id();
std::string rl_progress_key;
std::string rl_progress_val;
bool prev_progress_existed = true;
RLJobProgressKeyInfo rl_progress_key_info {instance_id, db_id, job_id};
rl_job_progress_key_info(rl_progress_key_info, &rl_progress_key);
TxnErrorCode err = txn->get(rl_progress_key, &rl_progress_val);
if (err != TxnErrorCode::TXN_OK) {
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
prev_progress_existed = false;
} else {
code = cast_as<ErrCategory::READ>(err);
ss << "failed to get routine load progress, db_id=" << db_id << " txn_id=" << txn_id
<< " err=" << err;
msg = ss.str();
return;
}
}
RoutineLoadProgressPB prev_progress_info;
if (prev_progress_existed) {
if (!prev_progress_info.ParseFromString(rl_progress_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "failed to parse routine load progress, db_id=" << db_id << " txn_id=" << txn_id;
msg = ss.str();
return;
}
}
std::string new_progress_val;
RoutineLoadProgressPB new_progress_info;
new_progress_info.CopyFrom(commit_attachment.progress());
for (auto const& elem : prev_progress_info.partition_to_offset()) {
auto it = new_progress_info.partition_to_offset().find(elem.first);
if (it == new_progress_info.partition_to_offset().end()) {
new_progress_info.mutable_partition_to_offset()->insert(elem);
}
}
std::string new_statistic_val;
RoutineLoadJobStatisticPB* new_statistic_info = new_progress_info.mutable_stat();
if (prev_progress_info.has_stat()) {
const RoutineLoadJobStatisticPB& prev_statistic_info = prev_progress_info.stat();
new_statistic_info->set_filtered_rows(prev_statistic_info.filtered_rows() +
commit_attachment.filtered_rows());
new_statistic_info->set_loaded_rows(prev_statistic_info.loaded_rows() +
commit_attachment.loaded_rows());
new_statistic_info->set_unselected_rows(prev_statistic_info.unselected_rows() +
commit_attachment.unselected_rows());
new_statistic_info->set_received_bytes(prev_statistic_info.received_bytes() +
commit_attachment.received_bytes());
new_statistic_info->set_task_execution_time_ms(
prev_statistic_info.task_execution_time_ms() +
commit_attachment.task_execution_time_ms());
} else {
new_statistic_info->set_filtered_rows(commit_attachment.filtered_rows());
new_statistic_info->set_loaded_rows(commit_attachment.loaded_rows());
new_statistic_info->set_unselected_rows(commit_attachment.unselected_rows());
new_statistic_info->set_received_bytes(commit_attachment.received_bytes());
new_statistic_info->set_task_execution_time_ms(commit_attachment.task_execution_time_ms());
}
if (!new_progress_info.SerializeToString(&new_progress_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize new progress val, txn_id=" << txn_id;
msg = ss.str();
return;
}
txn->put(rl_progress_key, new_progress_val);
LOG(INFO) << "put rl_progress_key key=" << hex(rl_progress_key)
<< " routine load new progress: " << new_progress_info.ShortDebugString();
}
void update_streaming_job_meta(MetaServiceCode& code, std::string& msg,
const std::string& instance_id, const CommitTxnRequest* request,
Transaction* txn, int64_t db_id) {
std::stringstream ss;
int64_t txn_id = request->txn_id();
if (!request->has_commit_attachment()) {
code = MetaServiceCode::INVALID_ARGUMENT;
ss << "missing commit attachment, db_id=" << db_id << " txn_id=" << txn_id;
msg = ss.str();
return;
}
TxnCommitAttachmentPB txn_commit_attachment = request->commit_attachment();
StreamingTaskCommitAttachmentPB commit_attachment =
txn_commit_attachment.streaming_task_txn_commit_attachment();
int64_t job_id = commit_attachment.job_id();
std::string streaming_job_val;
bool prev_existed = true;
std::string streaming_job_key_str = streaming_job_key({instance_id, db_id, job_id});
TxnErrorCode err = txn->get(streaming_job_key_str, &streaming_job_val);
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
prev_existed = false;
} else if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
ss << "failed to get streaming job, db_id=" << db_id << " txn_id=" << txn_id
<< " err=" << err;
msg = ss.str();
return;
}
StreamingTaskCommitAttachmentPB new_job_info;
if (prev_existed) {
if (!new_job_info.ParseFromString(streaming_job_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "failed to parse streaming job meta, db_id=" << db_id << " txn_id=" << txn_id;
msg = ss.str();
return;
}
new_job_info.set_scanned_rows(new_job_info.scanned_rows() +
commit_attachment.scanned_rows());
new_job_info.set_load_bytes(new_job_info.load_bytes() + commit_attachment.load_bytes());
new_job_info.set_num_files(new_job_info.num_files() + commit_attachment.num_files());
new_job_info.set_file_bytes(new_job_info.file_bytes() + commit_attachment.file_bytes());
} else {
new_job_info.set_job_id(commit_attachment.job_id());
new_job_info.set_scanned_rows(commit_attachment.scanned_rows());
new_job_info.set_load_bytes(commit_attachment.load_bytes());
new_job_info.set_num_files(commit_attachment.num_files());
new_job_info.set_file_bytes(commit_attachment.file_bytes());
}
if (commit_attachment.has_offset()) {
new_job_info.set_offset(commit_attachment.offset());
}
std::string new_job_val;
if (!new_job_info.SerializeToString(&new_job_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize new streaming job val, txn_id=" << txn_id;
msg = ss.str();
return;
}
txn->put(streaming_job_key_str, new_job_val);
LOG(INFO) << "put streaming_job_key key=" << hex(streaming_job_key_str)
<< " streaming job new meta: " << new_job_info.ShortDebugString();
}
void MetaServiceImpl::get_rl_task_commit_attach(::google::protobuf::RpcController* controller,
const GetRLTaskCommitAttachRequest* request,
GetRLTaskCommitAttachResponse* response,
::google::protobuf::Closure* done) {
RPC_PREPROCESS(get_rl_task_commit_attach, get);
instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty instance_id";
LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
return;
}
RPC_RATE_LIMIT(get_rl_task_commit_attach)
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
ss << "filed to create txn, err=" << err;
msg = ss.str();
return;
}
if (!request->has_db_id() || !request->has_job_id()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty db_id or job_id";
LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
return;
}
int64_t db_id = request->db_id();
int64_t job_id = request->job_id();
std::string rl_progress_key;
std::string rl_progress_val;
RLJobProgressKeyInfo rl_progress_key_info {instance_id, db_id, job_id};
rl_job_progress_key_info(rl_progress_key_info, &rl_progress_key);
err = txn->get(rl_progress_key, &rl_progress_val);
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
code = MetaServiceCode::ROUTINE_LOAD_PROGRESS_NOT_FOUND;
ss << "progress info not found, db_id=" << db_id << " job_id=" << job_id << " err=" << err;
msg = ss.str();
return;
} else if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
ss << "failed to get progress info, db_id=" << db_id << " job_id=" << job_id
<< " err=" << err;
msg = ss.str();
return;
}
RLTaskTxnCommitAttachmentPB* commit_attach = response->mutable_commit_attach();
RoutineLoadProgressPB* progress_info = commit_attach->mutable_progress();
if (!progress_info->ParseFromString(rl_progress_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "failed to parse progress info, db_id=" << db_id << " job_id=" << job_id;
msg = ss.str();
return;
}
if (progress_info->has_stat()) {
const RoutineLoadJobStatisticPB& statistic_info = progress_info->stat();
commit_attach->set_filtered_rows(statistic_info.filtered_rows());
commit_attach->set_loaded_rows(statistic_info.loaded_rows());
commit_attach->set_unselected_rows(statistic_info.unselected_rows());
commit_attach->set_received_bytes(statistic_info.received_bytes());
commit_attach->set_task_execution_time_ms(statistic_info.task_execution_time_ms());
}
}
void MetaServiceImpl::get_streaming_task_commit_attach(
::google::protobuf::RpcController* controller,
const GetStreamingTaskCommitAttachRequest* request,
GetStreamingTaskCommitAttachResponse* response, ::google::protobuf::Closure* done) {
RPC_PREPROCESS(get_streaming_task_commit_attach, get);
instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty instance_id";
LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
return;
}
RPC_RATE_LIMIT(get_streaming_task_commit_attach)
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
ss << "filed to create txn, err=" << err;
msg = ss.str();
return;
}
if (!request->has_db_id() || !request->has_job_id()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty db_id or job_id";
LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
return;
}
int64_t db_id = request->db_id();
int64_t job_id = request->job_id();
std::string streaming_job_val;
std::string streaming_job_key_str = streaming_job_key({instance_id, db_id, job_id});
err = txn->get(streaming_job_key_str, &streaming_job_val);
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
code = MetaServiceCode::STREAMING_JOB_PROGRESS_NOT_FOUND;
ss << "progress info not found, db_id=" << db_id << " job_id=" << job_id << " err=" << err;
msg = ss.str();
return;
} else if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
ss << "failed to get progress info, db_id=" << db_id << " job_id=" << job_id
<< " err=" << err;
msg = ss.str();
return;
}
StreamingTaskCommitAttachmentPB* commit_attach = response->mutable_commit_attach();
if (!commit_attach->ParseFromString(streaming_job_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "failed to parse meta info, db_id=" << db_id << " job_id=" << job_id;
msg = ss.str();
return;
}
}
void MetaServiceImpl::reset_rl_progress(::google::protobuf::RpcController* controller,
const ResetRLProgressRequest* request,
ResetRLProgressResponse* response,
::google::protobuf::Closure* done) {
RPC_PREPROCESS(reset_rl_progress, get, put, del);
instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty instance_id";
LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
return;
}
RPC_RATE_LIMIT(reset_rl_progress)
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
ss << "filed to create txn, err=" << err;
msg = ss.str();
return;
}
if (!request->has_db_id() || !request->has_job_id()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty db_id or job_id";
LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
return;
}
int64_t db_id = request->db_id();
int64_t job_id = request->job_id();
std::string rl_progress_key;
std::string rl_progress_val;
RLJobProgressKeyInfo rl_progress_key_info {instance_id, db_id, job_id};
rl_job_progress_key_info(rl_progress_key_info, &rl_progress_key);
if (request->partition_to_offset().size() == 0) {
txn->remove(rl_progress_key);
LOG(INFO) << "remove rl_progress_key key=" << hex(rl_progress_key);
}
if (request->partition_to_offset().size() > 0) {
bool prev_progress_existed = true;
RoutineLoadProgressPB prev_progress_info;
TxnErrorCode err = txn->get(rl_progress_key, &rl_progress_val);
if (err != TxnErrorCode::TXN_OK) {
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
prev_progress_existed = false;
} else {
code = cast_as<ErrCategory::READ>(err);
ss << "failed to get routine load progress, db_id=" << db_id << "job_id=" << job_id
<< " err=" << err;
msg = ss.str();
return;
}
}
if (prev_progress_existed) {
if (!prev_progress_info.ParseFromString(rl_progress_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "failed to parse routine load progress, db_id=" << db_id
<< "job_id=" << job_id;
msg = ss.str();
return;
}
}
std::string new_progress_val;
RoutineLoadProgressPB new_progress_info;
for (auto const& elem : request->partition_to_offset()) {
new_progress_info.mutable_partition_to_offset()->insert(elem);
}
if (request->partition_to_offset().size() > 0) {
for (auto const& elem : prev_progress_info.partition_to_offset()) {
auto it = new_progress_info.partition_to_offset().find(elem.first);
if (it == new_progress_info.partition_to_offset().end()) {
new_progress_info.mutable_partition_to_offset()->insert(elem);
}
}
}
if (!new_progress_info.SerializeToString(&new_progress_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize new progress val"
<< "db_id=" << db_id << "job_id=" << job_id;
msg = ss.str();
return;
}
txn->put(rl_progress_key, new_progress_val);
LOG(INFO) << "put rl_progress_key key=" << hex(rl_progress_key);
}
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
ss << "failed to commit progress info, db_id=" << db_id << " job_id=" << job_id
<< " err=" << err;
msg = ss.str();
return;
}
}
void MetaServiceImpl::reset_streaming_job_offset(::google::protobuf::RpcController* controller,
const ResetStreamingJobOffsetRequest* request,
ResetStreamingJobOffsetResponse* response,
::google::protobuf::Closure* done) {
RPC_PREPROCESS(reset_streaming_job_offset, get, put, del);
instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty instance_id";
LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
return;
}
RPC_RATE_LIMIT(reset_streaming_job_offset)
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
ss << "failed to create txn, err=" << err;
msg = ss.str();
return;
}
if (!request->has_db_id() || !request->has_job_id()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty db_id or job_id";
LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
return;
}
int64_t db_id = request->db_id();
int64_t job_id = request->job_id();
std::string streaming_job_key_str = streaming_job_key({instance_id, db_id, job_id});
std::string streaming_job_val;
// If no offset provided, remove the streaming job progress
if (!request->has_offset()) {
txn->remove(streaming_job_key_str);
LOG(INFO) << "remove streaming_job_key key=" << hex(streaming_job_key_str);
} else {
// If offset is provided, update the streaming job progress
bool prev_existed = true;
StreamingTaskCommitAttachmentPB prev_job_info;
err = txn->get(streaming_job_key_str, &streaming_job_val);
if (err != TxnErrorCode::TXN_OK) {
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
prev_existed = false;
} else {
code = cast_as<ErrCategory::READ>(err);
ss << "failed to get streaming job progress, db_id=" << db_id
<< " job_id=" << job_id << " err=" << err;
msg = ss.str();
return;
}
}
if (prev_existed) {
if (!prev_job_info.ParseFromString(streaming_job_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "failed to parse streaming job offset, db_id=" << db_id
<< " job_id=" << job_id;
msg = ss.str();
return;
}
}
std::string new_job_val;
StreamingTaskCommitAttachmentPB new_job_info;
// Set the new offset
new_job_info.set_offset(request->offset());
new_job_info.set_job_id(job_id);
// Preserve existing statistics if they exist
if (prev_existed) {
new_job_info.set_scanned_rows(prev_job_info.scanned_rows());
new_job_info.set_load_bytes(prev_job_info.load_bytes());
new_job_info.set_num_files(prev_job_info.num_files());
new_job_info.set_file_bytes(prev_job_info.file_bytes());
}
if (!new_job_info.SerializeToString(&new_job_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize new streaming job val, db_id=" << db_id
<< " job_id=" << job_id;
msg = ss.str();
return;
}
txn->put(streaming_job_key_str, new_job_val);
LOG(INFO) << "reset offset, put streaming_job_key key=" << hex(streaming_job_key_str)
<< " prev job val: " << prev_job_info.ShortDebugString()
<< " new job val: " << new_job_info.ShortDebugString();
}
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::COMMIT>(err);
ss << "failed to commit streaming job offset, db_id=" << db_id << " job_id=" << job_id
<< " err=" << err;
msg = ss.str();
return;
}
}
void MetaServiceImpl::delete_streaming_job(::google::protobuf::RpcController* controller,
const DeleteStreamingJobRequest* request,
DeleteStreamingJobResponse* response,
::google::protobuf::Closure* done) {
RPC_PREPROCESS(delete_streaming_job, del);
instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty instance_id";
LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
return;
}
RPC_RATE_LIMIT(delete_streaming_job)
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
ss << "filed to create txn, err=" << err;
msg = ss.str();
return;
}
if (!request->has_db_id() || !request->has_job_id()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "missing db_id or job_id";
LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
return;
}
int64_t db_id = request->db_id();
int64_t job_id = request->job_id();
std::string key_to_delete = streaming_job_key({instance_id, db_id, job_id});
txn->remove(key_to_delete);
LOG(INFO) << "remove key=" << hex(key_to_delete);
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
ss << "failed to commit delete, err=" << err;
msg = ss.str();
return;
}
}
void get_txn_db_id(TxnKv* txn_kv, const std::string& instance_id, int64_t txn_id,
MetaServiceCode& code, std::string& msg, int64_t* db_id, KVStats* stats) {
std::stringstream ss;
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
ss << "failed to create txn, txn_id=" << txn_id << " err=" << err;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
DORIS_CLOUD_DEFER {
if (!stats || !txn) return;
stats->get_bytes += txn->get_bytes();
stats->get_counter += txn->num_get_keys();
};
// Get db id with txn id
std::string index_val;
const std::string index_key = txn_index_key({instance_id, txn_id});
err = txn->get(index_key, &index_val);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
ss << "failed to get db id, txn_id=" << txn_id << " err=" << err;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
TxnIndexPB index_pb;
if (!index_pb.ParseFromString(index_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "failed to parse txn_index_pb, txn_id=" << txn_id;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
DCHECK(index_pb.has_tablet_index() == true);
DCHECK(index_pb.tablet_index().has_db_id() == true);
*db_id = index_pb.tablet_index().db_id();
}
void scan_tmp_rowset(
const std::string& instance_id, int64_t txn_id, std::shared_ptr<TxnKv> txn_kv,
MetaServiceCode& code, std::string& msg,
std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>* tmp_rowsets_meta,
KVStats* stats) {
// Create a readonly txn for scan tmp rowset
std::stringstream ss;
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
ss << "failed to create txn, txn_id=" << txn_id << " err=" << err;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
DORIS_CLOUD_DEFER {
if (!stats || !txn) return;
stats->get_bytes += txn->get_bytes();
stats->get_counter += txn->num_get_keys();
};
// Get temporary rowsets involved in the txn
// This is a range scan
MetaRowsetTmpKeyInfo rs_tmp_key_info0 {instance_id, txn_id, 0};
MetaRowsetTmpKeyInfo rs_tmp_key_info1 {instance_id, txn_id + 1, 0};
std::string rs_tmp_key0;
std::string rs_tmp_key1;
meta_rowset_tmp_key(rs_tmp_key_info0, &rs_tmp_key0);
meta_rowset_tmp_key(rs_tmp_key_info1, &rs_tmp_key1);
int num_rowsets = 0;
DORIS_CLOUD_DEFER_COPY(rs_tmp_key0, rs_tmp_key1) {
LOG(INFO) << "get tmp rowset meta, txn_id=" << txn_id << " num_rowsets=" << num_rowsets
<< " range=[" << hex(rs_tmp_key0) << "," << hex(rs_tmp_key1) << ")";
};
std::unique_ptr<RangeGetIterator> it;
do {
err = txn->get(rs_tmp_key0, rs_tmp_key1, &it, true);
if (err == TxnErrorCode::TXN_TOO_OLD) {
err = txn_kv->create_txn(&txn);
if (err == TxnErrorCode::TXN_OK) {
err = txn->get(rs_tmp_key0, rs_tmp_key1, &it, true);
}
}
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
ss << "internal error, failed to get tmp rowset while committing, txn_id=" << txn_id
<< " err=" << err;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
while (it->has_next()) {
auto [k, v] = it->next();
LOG(INFO) << "range_get rowset_tmp_key=" << hex(k) << " txn_id=" << txn_id;
tmp_rowsets_meta->emplace_back();
if (!tmp_rowsets_meta->back().second.ParseFromArray(v.data(), v.size())) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "malformed rowset meta, unable to initialize, txn_id=" << txn_id
<< " key=" << hex(k);
msg = ss.str();
LOG(WARNING) << msg;
return;
}
// Save keys that will be removed later
tmp_rowsets_meta->back().first = std::string(k.data(), k.size());
++num_rowsets;
if (!it->has_next()) rs_tmp_key0 = k;
}
rs_tmp_key0.push_back('\x00'); // Update to next smallest key for iteration
} while (it->more());
VLOG_DEBUG << "txn_id=" << txn_id << " tmp_rowsets_meta.size()=" << tmp_rowsets_meta->size();
return;
}
void update_tablet_stats(const StatsTabletKeyInfo& info, const TabletStats& stats,
std::unique_ptr<Transaction>& txn, MetaServiceCode& code,
std::string& msg) {
if (config::split_tablet_stats) {
if (stats.num_segs > 0) {
std::string data_size_key;
stats_tablet_data_size_key(info, &data_size_key);
txn->atomic_add(data_size_key, stats.data_size);
std::string num_rows_key;
stats_tablet_num_rows_key(info, &num_rows_key);
txn->atomic_add(num_rows_key, stats.num_rows);
std::string num_segs_key;
stats_tablet_num_segs_key(info, &num_segs_key);
txn->atomic_add(num_segs_key, stats.num_segs);
std::string index_size_key;
stats_tablet_index_size_key(info, &index_size_key);
txn->atomic_add(index_size_key, stats.index_size);
std::string segment_size_key;
stats_tablet_segment_size_key(info, &segment_size_key);
txn->atomic_add(segment_size_key, stats.segment_size);
}
std::string num_rowsets_key;
stats_tablet_num_rowsets_key(info, &num_rowsets_key);
txn->atomic_add(num_rowsets_key, stats.num_rowsets);
} else {
std::string key;
stats_tablet_key(info, &key);
std::string val;
TxnErrorCode err = txn->get(key, &val);
if (err != TxnErrorCode::TXN_OK) {
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TABLET_NOT_FOUND
: cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to get tablet stats, err={} tablet_id={}", err,
std::get<4>(info));
return;
}
TabletStatsPB stats_pb;
if (!stats_pb.ParseFromString(val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = fmt::format("malformed tablet stats value, key={}", hex(key));
return;
}
stats_pb.set_data_size(stats_pb.data_size() + stats.data_size);
stats_pb.set_num_rows(stats_pb.num_rows() + stats.num_rows);
stats_pb.set_num_rowsets(stats_pb.num_rowsets() + stats.num_rowsets);
stats_pb.set_num_segments(stats_pb.num_segments() + stats.num_segs);
stats_pb.set_index_size(stats_pb.index_size() + stats.index_size);
stats_pb.set_segment_size(stats_pb.segment_size() + stats.segment_size);
stats_pb.SerializeToString(&val);
txn->put(key, val);
LOG(INFO) << "put stats_tablet_key key=" << hex(key);
}
}
// process mow table, check lock and remove pending key
void process_mow_when_commit_txn(
const CommitTxnRequest* request, const std::string& instance_id, MetaServiceCode& code,
std::string& msg, std::unique_ptr<Transaction>& txn,
std::unordered_map<int64_t, std::vector<int64_t>>& table_id_tablet_ids) {
int64_t txn_id = request->txn_id();
std::stringstream ss;
std::vector<std::string> lock_keys;
lock_keys.reserve(request->mow_table_ids().size());
for (auto table_id : request->mow_table_ids()) {
lock_keys.push_back(meta_delete_bitmap_update_lock_key({instance_id, table_id, -1}));
}
std::vector<std::optional<std::string>> lock_values;
TxnErrorCode err = txn->batch_get(&lock_values, lock_keys);
if (err != TxnErrorCode::TXN_OK) {
ss << "failed to get delete bitmap update lock key info, instance_id=" << instance_id
<< " err=" << err;
msg = ss.str();
code = cast_as<ErrCategory::READ>(err);
LOG(WARNING) << msg << " txn_id=" << txn_id;
return;
}
size_t total_locks = lock_keys.size();
for (size_t i = 0; i < total_locks; i++) {
int64_t table_id = request->mow_table_ids(i);
// When the key does not exist, it means the lock has been acquired
// by another transaction and successfully committed.
if (!lock_values[i].has_value()) {
ss << "get delete bitmap update lock info, lock is expired"
<< " table_id=" << table_id << " key=" << hex(lock_keys[i]) << " txn_id=" << txn_id;
code = MetaServiceCode::LOCK_EXPIRED;
msg = ss.str();
LOG(WARNING) << msg << " txn_id=" << txn_id;
return;
}
DeleteBitmapUpdateLockPB lock_info;
if (!lock_info.ParseFromString(lock_values[i].value())) [[unlikely]] {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = "failed to parse DeleteBitmapUpdateLockPB";
LOG(WARNING) << msg << " txn_id=" << txn_id;
return;
}
if (lock_info.lock_id() != request->txn_id()) {
ss << "lock is expired, locked by lock_id=" << lock_info.lock_id();
msg = ss.str();
code = MetaServiceCode::LOCK_EXPIRED;
return;
}
txn->remove(lock_keys[i]);
LOG(INFO) << "xxx remove delete bitmap lock, lock_key=" << hex(lock_keys[i])
<< " table_id=" << table_id << " txn_id=" << txn_id;
for (auto tablet_id : table_id_tablet_ids[table_id]) {
std::string pending_key = meta_pending_delete_bitmap_key({instance_id, tablet_id});
txn->remove(pending_key);
LOG(INFO) << "xxx remove delete bitmap pending key, pending_key=" << hex(pending_key)
<< " txn_id=" << txn_id;
}
}
lock_keys.clear();
lock_values.clear();
}
std::pair<MetaServiceCode, std::string> get_tablet_indexes(
Transaction* txn, std::unordered_map<int64_t, TabletIndexPB>* tablet_indexes,
std::string_view instance_id, const std::vector<int64_t>& tablet_ids,
bool snapshot = false) {
std::vector<std::string> tablet_idx_keys;
std::vector<std::optional<std::string>> tablet_idx_values;
tablet_idx_keys.reserve(tablet_ids.size());
tablet_idx_values.resize(tablet_idx_keys.size());
for (int64_t tablet_id : tablet_ids) {
tablet_idx_keys.push_back(meta_tablet_idx_key({instance_id, tablet_id}));
}
TxnErrorCode err = txn->batch_get(&tablet_idx_values, tablet_idx_keys,
Transaction::BatchGetOptions(false));
if (err != TxnErrorCode::TXN_OK) {
auto msg = fmt::format("failed to get tablet table index ids, err={}", err);
LOG_WARNING(msg);
return {cast_as<ErrCategory::READ>(err), msg};
}
size_t total_tablets = tablet_idx_values.size();
for (size_t i = 0; i < total_tablets; i++) {
int64_t tablet_id = tablet_ids[i];
if (!tablet_idx_values[i].has_value()) [[unlikely]] {
// The value must existed
auto msg = fmt::format(
"failed to get tablet table index ids, err=not found tablet_id={} ", tablet_id);
LOG_WARNING(msg).tag("err", err).tag("key", hex(tablet_idx_keys[i]));
return {MetaServiceCode::KV_TXN_GET_ERR, msg};
}
TabletIndexPB tablet_index;
if (!tablet_index.ParseFromString(tablet_idx_values[i].value())) [[unlikely]] {
auto msg = fmt::format("malformed tablet index value tablet_id={} snapshot={}",
tablet_id, snapshot);
LOG_WARNING(msg).tag("key", hex(tablet_idx_keys[i]));
return {MetaServiceCode::PROTOBUF_PARSE_ERR, msg};
}
VLOG_DEBUG << "tablet_id:" << tablet_id << " value:" << tablet_index.ShortDebugString();
tablet_indexes->emplace(tablet_id, std::move(tablet_index));
}
return {MetaServiceCode::OK, ""};
}
std::pair<MetaServiceCode, std::string> get_partition_versions(
Transaction* txn, std::unordered_map<int64_t, int64_t>* versions,
int64_t* last_pending_txn_id, std::string_view instance_id,
std::unordered_map<int64_t, std::tuple<int64_t, int64_t>>& partition_indexes) {
std::vector<int64_t> partition_ids;
std::vector<std::string> version_keys;
std::vector<std::optional<std::string>> version_values;
partition_ids.reserve(partition_indexes.size());
version_keys.reserve(partition_indexes.size());
version_values.reserve(partition_indexes.size());
for (auto&& [partition_id, db_and_table] : partition_indexes) {
auto [db_id, table_id] = db_and_table;
std::string ver_key = partition_version_key({instance_id, db_id, table_id, partition_id});
version_keys.push_back(std::move(ver_key));
partition_ids.push_back(partition_id);
}
TxnErrorCode err = txn->batch_get(&version_values, version_keys);
if (err != TxnErrorCode::TXN_OK) {
auto msg = fmt::format("failed to get partition versions, err={}", err);
LOG_WARNING(msg);
return {cast_as<ErrCategory::READ>(err), msg};
}
size_t total_versions = version_keys.size();
for (size_t i = 0; i < total_versions; i++) {
int64_t version;
if (version_values[i].has_value()) {
VersionPB version_pb;
if (!version_pb.ParseFromString(version_values[i].value())) {
auto msg = fmt::format("malformed version value, key={}", hex(version_keys[i]));
LOG_WARNING(msg);
return {MetaServiceCode::PROTOBUF_PARSE_ERR, msg};
}
if (version_pb.pending_txn_ids_size() > 0) {
DCHECK(version_pb.pending_txn_ids_size() == 1);
*last_pending_txn_id = version_pb.pending_txn_ids(0);
DCHECK(*last_pending_txn_id > 0);
return {MetaServiceCode::OK, ""};
}
version = version_pb.version();
} else {
version = 1;
}
VLOG_DEBUG << "get partition version, partition_id=" << partition_ids[i]
<< " version=" << version << " key=" << hex(version_keys[i])
<< " has_value=" << version_values[i].has_value();
versions->emplace(partition_ids[i], version);
last_pending_txn_id = 0;
}
return {MetaServiceCode::OK, ""};
}
/**
* 0. Extract txn_id from request
* 1. Get db id from TxnKv with txn_id
* 2. Get TxnInfo from TxnKv with db_id and txn_id
* 3. Get tmp rowset meta, there may be several or hundred of tmp rowsets
* 4. Get versions of each rowset
* 5. Put rowset meta, which will be visible to user
* 6. Put TxnInfo back into TxnKv with updated txn status (committed)
* 7. Update versions of each partition
* 8. Remove tmp rowset meta
*
* Note: getting version and all changes maded are in a single TxnKv transaction:
* step 5, 6, 7, 8
*/
void MetaServiceImpl::commit_txn_immediately(
const CommitTxnRequest* request, CommitTxnResponse* response, MetaServiceCode& code,
std::string& msg, const std::string& instance_id, int64_t db_id,
std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>& tmp_rowsets_meta,
TxnErrorCode& err, KVStats& stats) {
std::stringstream ss;
int64_t txn_id = request->txn_id();
bool is_versioned_write = is_version_write_enabled(instance_id);
bool is_versioned_read = is_version_read_enabled(instance_id);
do {
TEST_SYNC_POINT_CALLBACK("commit_txn_immediately:begin", &txn_id);
int64_t last_pending_txn_id = 0;
std::unique_ptr<Transaction> txn;
err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
ss << "failed to create txn, txn_id=" << txn_id << " err=" << err;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
DORIS_CLOUD_DEFER {
if (txn == nullptr) return;
stats.get_bytes += txn->get_bytes();
stats.put_bytes += txn->put_bytes();
stats.del_bytes += txn->delete_bytes();
stats.get_counter += txn->num_get_keys();
stats.put_counter += txn->num_put_keys();
stats.del_counter += txn->num_del_keys();
};
// Get txn info with db_id and txn_id
std::string info_val; // Will be reused when saving updated txn
const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
err = txn->get(info_key, &info_val);
if (err != TxnErrorCode::TXN_OK) {
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND
: cast_as<ErrCategory::READ>(err);
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
ss << "transaction [" << txn_id << "] not found, db_id=" << db_id;
} else {
ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << txn_id
<< " err=" << err;
}
msg = ss.str();
LOG(WARNING) << msg;
return;
}
TxnInfoPB txn_info;
if (!txn_info.ParseFromString(info_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" << txn_id;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
// TODO: do more check like txn state, 2PC etc.
DCHECK(txn_info.txn_id() == txn_id);
if (txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) {
code = MetaServiceCode::TXN_ALREADY_ABORTED;
ss << "transaction [" << txn_id << "] is already aborted, db_id=" << db_id;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
if (txn_info.status() == TxnStatusPB::TXN_STATUS_VISIBLE) {
if (request->has_is_2pc() && request->is_2pc()) {
code = MetaServiceCode::TXN_ALREADY_VISIBLE;
ss << "transaction [" << txn_id << "] is already visible, not pre-committed.";
msg = ss.str();
LOG(INFO) << msg;
response->mutable_txn_info()->CopyFrom(txn_info);
return;
}
code = MetaServiceCode::OK;
ss << "transaction is already visible: db_id=" << db_id << " txn_id=" << txn_id;
msg = ss.str();
LOG(INFO) << msg;
response->mutable_txn_info()->CopyFrom(txn_info);
return;
}
if (request->has_is_2pc() && request->is_2pc() &&
txn_info.status() == TxnStatusPB::TXN_STATUS_PREPARED) {
code = MetaServiceCode::TXN_INVALID_STATUS;
ss << "transaction is prepare, not pre-committed: db_id=" << db_id << " txn_id"
<< txn_id;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
LOG(INFO) << "txn_id=" << txn_id << " txn_info=" << txn_info.ShortDebugString();
CloneChainReader meta_reader(instance_id, resource_mgr_.get());
// Prepare rowset meta and new_versions
AnnotateTag txn_tag("txn_id", txn_id);
std::unordered_map<int64_t, TabletIndexPB> tablet_ids;
auto acquired_tablet_ids = to_container<std::vector<int64_t>>(
std::ranges::ref_view(tmp_rowsets_meta) |
std::ranges::views::transform(
[](const auto& pair) { return pair.second.tablet_id(); }));
if (!is_versioned_read) {
std::tie(code, msg) =
get_tablet_indexes(txn.get(), &tablet_ids, instance_id, acquired_tablet_ids);
if (code != MetaServiceCode::OK) {
return;
}
} else {
err = meta_reader.get_tablet_indexes(txn.get(), acquired_tablet_ids, &tablet_ids);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to get tablet indexes, err={}", err);
LOG_WARNING(msg);
return;
}
}
std::unordered_map<int64_t, std::tuple<int64_t, int64_t>> partition_indexes;
for (auto& [_, i] : tmp_rowsets_meta) {
int64_t tablet_id = i.tablet_id();
int64_t table_id = tablet_ids[tablet_id].table_id();
int64_t partition_id = i.partition_id();
partition_indexes.insert({partition_id, {db_id, table_id}});
}
// {table/partition} -> version
std::unordered_map<int64_t, int64_t> versions;
if (!is_versioned_read) {
std::tie(code, msg) = get_partition_versions(txn.get(), &versions, &last_pending_txn_id,
instance_id, partition_indexes);
if (code != MetaServiceCode::OK) {
return;
}
} else {
std::vector<int64_t> partition_ids = to_container<std::vector<int64_t>>(
std::ranges::ref_view(partition_indexes) | std::ranges::views::keys);
err = meta_reader.get_partition_versions(txn.get(), partition_ids, &versions,
&last_pending_txn_id);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to get partition versions, err={}", err);
LOG_WARNING(msg);
return;
}
}
if (last_pending_txn_id > 0) {
stats.get_bytes += txn->get_bytes();
stats.get_counter += txn->num_get_keys();
txn.reset();
TEST_SYNC_POINT_CALLBACK("commit_txn_immediately::advance_last_pending_txn_id",
&last_pending_txn_id);
std::shared_ptr<TxnLazyCommitTask> task =
txn_lazy_committer_->submit(instance_id, last_pending_txn_id);
std::tie(code, msg) = task->wait();
if (code != MetaServiceCode::OK) {
LOG(WARNING) << "advance_last_txn failed last_txn=" << last_pending_txn_id
<< " code=" << code << " msg=" << msg;
return;
}
last_pending_txn_id = 0;
continue;
}
record_txn_commit_stats(txn.get(), instance_id, partition_indexes.size(), tablet_ids.size(),
txn_id);
CommitTxnLogPB commit_txn_log;
commit_txn_log.set_txn_id(txn_id);
commit_txn_log.set_db_id(db_id);
// <tablet_id, version> -> rowset meta
std::vector<std::pair<std::tuple<int64_t, int64_t>, const RowsetMetaCloudPB&>> rowsets;
std::unordered_map<int64_t, TabletStats> tablet_stats; // tablet_id -> stats
rowsets.reserve(tmp_rowsets_meta.size());
int64_t rowsets_visible_ts_ms =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
for (auto& [_, i] : tmp_rowsets_meta) {
int64_t tablet_id = i.tablet_id();
int64_t partition_id = i.partition_id();
if (!versions.contains(partition_id)) [[unlikely]] {
// it is impossible.
code = MetaServiceCode::UNDEFINED_ERR;
ss << "failed to get partition version key, the target version not exists in "
"versions."
<< " txn_id=" << txn_id << " partition_id=" << partition_id;
ss << " versions";
for (const auto& [pid, ver] : versions) {
ss << " partition_id=" << pid << " version=" << ver;
}
msg = ss.str();
LOG(ERROR) << msg;
return;
}
// Update rowset version
int64_t new_version = versions[partition_id] + 1;
i.set_start_version(new_version);
i.set_end_version(new_version);
i.set_visible_ts_ms(rowsets_visible_ts_ms);
// Accumulate affected rows
auto& stats = tablet_stats[tablet_id];
stats.data_size += i.total_disk_size();
stats.num_rows += i.num_rows();
++stats.num_rowsets;
stats.num_segs += i.num_segments();
stats.index_size += i.index_disk_size();
stats.segment_size += i.data_disk_size();
commit_txn_log.mutable_tablet_to_partition_map()->insert({tablet_id, partition_id});
commit_txn_log.mutable_partition_version_map()->insert({partition_id, new_version});
rowsets.emplace_back(std::make_tuple(tablet_id, i.end_version()), i);
} // for tmp_rowsets_meta
std::unordered_map<int64_t, std::vector<int64_t>> table_id_tablet_ids;
for (auto& [tablet_id, tablet_index] : tablet_ids) {
int64_t table_id = tablet_index.table_id();
table_id_tablet_ids[table_id].push_back(tablet_id);
}
process_mow_when_commit_txn(request, instance_id, code, msg, txn, table_id_tablet_ids);
if (code != MetaServiceCode::OK) {
LOG(WARNING) << "process mow failed, txn_id=" << txn_id << " code=" << code;
return;
}
// Save rowset meta
for (auto& i : rowsets) {
auto [tablet_id, version] = i.first;
std::string rowset_key = meta_rowset_key({instance_id, tablet_id, version});
std::string val;
if (!i.second.SerializeToString(&val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize rowset_meta, txn_id=" << txn_id;
msg = ss.str();
return;
}
size_t rowset_size = rowset_key.size() + val.size();
txn->put(rowset_key, val);
LOG(INFO) << "put rowset_key=" << hex(rowset_key) << " txn_id=" << txn_id
<< " rowset_size=" << rowset_size;
if (is_versioned_write) {
auto& rowset_meta = i.second;
std::string meta_rowset_key = versioned::meta_rowset_key(
{instance_id, tablet_id, rowset_meta.rowset_id_v2()});
if (config::enable_recycle_rowset_strip_key_bounds) {
doris::RowsetMetaCloudPB rowset_meta_copy = rowset_meta;
// Strip key bounds to shrink operation log for ts compaction recycle entries
rowset_meta_copy.clear_segments_key_bounds();
rowset_meta_copy.clear_segments_key_bounds_truncated();
blob_put(txn.get(), meta_rowset_key, rowset_meta_copy.SerializeAsString(), 0);
} else {
blob_put(txn.get(), meta_rowset_key, rowset_meta.SerializeAsString(), 0);
}
LOG(INFO) << "put versioned meta_rowset_key=" << hex(meta_rowset_key);
std::string versioned_rowset_key =
versioned::meta_rowset_load_key({instance_id, tablet_id, version});
RowsetMetaCloudPB copied_rowset_meta(i.second);
if (!versioned::document_put(txn.get(), versioned_rowset_key,
std::move(copied_rowset_meta))) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to put versioned rowset meta, txn_id=" << txn_id
<< " key=" << hex(versioned_rowset_key);
msg = ss.str();
LOG(WARNING) << msg;
return;
}
LOG(INFO) << "put versioned rowset meta key=" << hex(versioned_rowset_key)
<< ", txn_id=" << txn_id;
}
}
// Save versions
int64_t version_update_time_ms =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
response->set_version_update_time_ms(version_update_time_ms);
for (auto& [partition_id, version] : versions) {
int64_t new_version = version + 1;
std::string ver_val;
VersionPB version_pb;
version_pb.set_version(new_version);
version_pb.set_update_time_ms(version_update_time_ms);
if (!version_pb.SerializeToString(&ver_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize version_pb when saving, txn_id=" << txn_id;
msg = ss.str();
return;
}
auto [db_id, table_id] = partition_indexes[partition_id];
auto version_key = partition_version_key({instance_id, db_id, table_id, partition_id});
txn->put(version_key, ver_val);
LOG(INFO) << "put partition_version_key=" << hex(version_key)
<< " version:" << new_version << " txn_id=" << txn_id
<< " partition_id=" << partition_id
<< " update_time=" << version_update_time_ms;
VLOG_DEBUG << " table_id=" << table_id << " partition_id=" << partition_id;
if (is_versioned_write) {
std::string partition_version_key =
versioned::partition_version_key({instance_id, partition_id});
versioned_put(txn.get(), partition_version_key, ver_val);
LOG(INFO) << "put versioned partition key=" << hex(partition_version_key)
<< ", txn_id=" << txn_id;
}
response->add_table_ids(table_id);
response->add_partition_ids(partition_id);
response->add_versions(new_version);
}
// Save table versions
for (auto& i : table_id_tablet_ids) {
if (is_versioned_read) {
// Read the table version, to build the operation log visible version range.
err = meta_reader.get_table_version(txn.get(), i.first, nullptr, true);
if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
code = cast_as<ErrCategory::READ>(err);
ss << "failed to get table version, err=" << err << " table_id=" << i.first;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
}
update_table_version(txn.get(), instance_id, db_id, i.first);
commit_txn_log.add_table_ids(i.first);
}
LOG(INFO) << " before update txn_info=" << txn_info.ShortDebugString();
// Update txn_info
txn_info.set_status(TxnStatusPB::TXN_STATUS_VISIBLE);
auto now_time = system_clock::now();
uint64_t commit_time = duration_cast<milliseconds>(now_time.time_since_epoch()).count();
if ((txn_info.prepare_time() + txn_info.timeout_ms()) < commit_time) {
code = MetaServiceCode::UNDEFINED_ERR;
msg = fmt::format("txn is expired, not allow to commit txn_id={}", txn_id);
LOG(INFO) << msg << " prepare_time=" << txn_info.prepare_time()
<< " timeout_ms=" << txn_info.timeout_ms() << " commit_time=" << commit_time;
return;
}
txn_info.set_commit_time(commit_time);
txn_info.set_finish_time(commit_time);
if (request->has_commit_attachment()) {
txn_info.mutable_commit_attachment()->CopyFrom(request->commit_attachment());
}
txn_info.set_versioned_write(is_versioned_write);
txn_info.set_versioned_read(is_versioned_read);
LOG(INFO) << "after update txn_info=" << txn_info.ShortDebugString();
info_val.clear();
if (!txn_info.SerializeToString(&info_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize txn_info when saving, txn_id=" << txn_id;
msg = ss.str();
return;
}
txn->put(info_key, info_val);
LOG(INFO) << "put info_key=" << hex(info_key) << " txn_id=" << txn_id;
// Batch get existing versioned tablet stats if needed
std::unordered_map<int64_t, TabletStatsPB> existing_versioned_stats;
if (is_versioned_write && !tablet_stats.empty()) {
internal_get_load_tablet_stats_batch(code, msg, meta_reader, txn.get(), instance_id,
tablet_ids, &existing_versioned_stats);
if (code != MetaServiceCode::OK) {
LOG(WARNING) << "batch get versioned tablet stats failed, code=" << code
<< " msg=" << msg << " txn_id=" << txn_id;
return;
}
LOG(INFO) << "batch get " << existing_versioned_stats.size()
<< " versioned tablet stats, txn_id=" << txn_id;
}
// Update stats of affected tablet
for (auto& [tablet_id, stats] : tablet_stats) {
DCHECK(tablet_ids.count(tablet_id));
auto& tablet_idx = tablet_ids[tablet_id];
StatsTabletKeyInfo info {instance_id, tablet_idx.table_id(), tablet_idx.index_id(),
tablet_idx.partition_id(), tablet_id};
update_tablet_stats(info, stats, txn, code, msg);
if (code != MetaServiceCode::OK) return;
if (is_versioned_write) {
TabletStatsPB stats_pb = existing_versioned_stats[tablet_id];
merge_tablet_stats(stats_pb, stats);
std::string stats_key = versioned::tablet_load_stats_key({instance_id, tablet_id});
if (!versioned::document_put(txn.get(), stats_key, std::move(stats_pb))) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
msg = "failed to serialize versioned tablet stats";
LOG(WARNING) << msg << " tablet_id=" << tablet_id << " txn_id=" << txn_id;
return;
}
LOG(INFO) << "put versioned tablet stats key=" << hex(stats_key)
<< " tablet_id=" << tablet_id << " txn_id=" << txn_id;
}
}
// Remove tmp rowset meta
for (auto& [k, _] : tmp_rowsets_meta) {
txn->remove(k);
LOG(INFO) << "remove tmp_rowset_key=" << hex(k) << " txn_id=" << txn_id;
}
const std::string running_key = txn_running_key({instance_id, db_id, txn_id});
LOG(INFO) << "remove running_key=" << hex(running_key) << " txn_id=" << txn_id;
txn->remove(running_key);
RecycleTxnPB recycle_pb;
recycle_pb.set_creation_time(commit_time);
recycle_pb.set_label(txn_info.label());
if (is_versioned_write) {
commit_txn_log.mutable_recycle_txn()->Swap(&recycle_pb);
std::string log_key = versioned::log_key({instance_id});
OperationLogPB operation_log;
if (is_versioned_read) {
operation_log.set_min_timestamp(meta_reader.min_read_version());
}
operation_log.mutable_commit_txn()->Swap(&commit_txn_log);
versioned::blob_put(txn.get(), log_key, operation_log);
LOG(INFO) << "put commit txn operation log, key=" << hex(log_key)
<< " txn_id=" << txn_id;
} else {
std::string recycle_key = recycle_txn_key({instance_id, db_id, txn_id});
std::string recycle_val;
if (!recycle_pb.SerializeToString(&recycle_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize recycle_pb, txn_id=" << txn_id;
msg = ss.str();
return;
}
txn->put(recycle_key, recycle_val);
LOG(INFO) << "xxx commit_txn put recycle_key key=" << hex(recycle_key)
<< " txn_id=" << txn_id;
}
if (txn_info.load_job_source_type() ==
LoadJobSourceTypePB::LOAD_JOB_SRC_TYPE_ROUTINE_LOAD_TASK) {
put_routine_load_progress(code, msg, instance_id, request, txn.get(), db_id);
}
if (txn_info.load_job_source_type() ==
LoadJobSourceTypePB::LOAD_JOB_SRC_TYPE_STREAMING_JOB) {
update_streaming_job_meta(code, msg, instance_id, request, txn.get(), db_id);
if (code != MetaServiceCode::OK) {
LOG(WARNING) << "update_streaming_job_meta failed, txn_id=" << txn_id
<< " code=" << code << " msg=" << msg;
return;
}
}
LOG(INFO) << "commit_txn put_size=" << txn->put_bytes()
<< " del_size=" << txn->delete_bytes() << " num_put_keys=" << txn->num_put_keys()
<< " num_del_keys=" << txn->num_del_keys()
<< " txn_size=" << txn->approximate_bytes() << " txn_id=" << txn_id;
TEST_SYNC_POINT_RETURN_WITH_VOID("commit_txn_immediately::before_commit", &err, &code);
// Finally we are done...
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
if (err == TxnErrorCode::TXN_CONFLICT) {
g_bvar_delete_bitmap_lock_txn_remove_conflict_by_load_counter << 1;
}
code = cast_as<ErrCategory::COMMIT>(err);
ss << "failed to commit kv txn, txn_id=" << txn_id << " err=" << err;
msg = ss.str();
return;
}
// calculate table stats from tablets stats
std::map<int64_t /*table_id*/, TableStats> table_stats;
std::vector<int64_t> base_tablet_ids(request->base_tablet_ids().begin(),
request->base_tablet_ids().end());
calc_table_stats(tablet_ids, tablet_stats, table_stats, base_tablet_ids);
for (const auto& pair : table_stats) {
TableStatsPB* stats_pb = response->add_table_stats();
auto table_id = pair.first;
auto stats = pair.second;
get_pb_from_tablestats(stats, stats_pb);
stats_pb->set_table_id(table_id);
VLOG_DEBUG << "Add TableStats to CommitTxnResponse. txn_id=" << txn_id
<< " table_id=" << table_id
<< " updated_row_count=" << stats_pb->updated_row_count();
}
response->mutable_txn_info()->CopyFrom(txn_info);
TEST_SYNC_POINT_CALLBACK("commit_txn_immediately::finish", &code);
break;
} while (true);
} // end commit_txn_immediately
// rewrite TabletIndexPB for fill db_id, in case of historical reasons
// TabletIndexPB missing db_id
void repair_tablet_index(
std::shared_ptr<TxnKv>& txn_kv, MetaServiceCode& code, std::string& msg,
const std::string& instance_id, int64_t db_id, int64_t txn_id,
const std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>& tmp_rowsets_meta,
bool is_versioned_write) {
std::stringstream ss;
std::vector<std::string> tablet_idx_keys;
for (auto& [_, i] : tmp_rowsets_meta) {
tablet_idx_keys.push_back(meta_tablet_idx_key({instance_id, i.tablet_id()}));
}
for (size_t i = 0; i < tablet_idx_keys.size(); i += config::max_tablet_index_num_per_batch) {
size_t end = (i + config::max_tablet_index_num_per_batch) > tablet_idx_keys.size()
? tablet_idx_keys.size()
: i + config::max_tablet_index_num_per_batch;
const std::vector<std::string> sub_tablet_idx_keys(tablet_idx_keys.begin() + i,
tablet_idx_keys.begin() + end);
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
ss << "failed to create txn, txn_id=" << txn_id << " err=" << err;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
std::vector<std::optional<std::string>> tablet_idx_values;
// batch get snapshot is false
err = txn->batch_get(&tablet_idx_values, sub_tablet_idx_keys,
Transaction::BatchGetOptions(false));
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
ss << "failed to get tablet table index ids, err=" << err;
msg = ss.str();
LOG(WARNING) << msg << " txn_id=" << txn_id;
return;
}
DCHECK(tablet_idx_values.size() <= config::max_tablet_index_num_per_batch);
for (size_t j = 0; j < sub_tablet_idx_keys.size(); j++) {
if (!tablet_idx_values[j].has_value()) [[unlikely]] {
// The value must existed
code = MetaServiceCode::KV_TXN_GET_ERR;
ss << "failed to get tablet table index ids, err=not found"
<< " key=" << hex(tablet_idx_keys[j]);
msg = ss.str();
LOG(WARNING) << msg << " err=" << err << " txn_id=" << txn_id;
return;
}
TabletIndexPB tablet_idx_pb;
if (!tablet_idx_pb.ParseFromString(tablet_idx_values[j].value())) [[unlikely]] {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "malformed tablet index value key=" << hex(tablet_idx_keys[j])
<< " txn_id=" << txn_id;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
if (!tablet_idx_pb.has_db_id()) {
tablet_idx_pb.set_db_id(db_id);
std::string idx_val;
if (!tablet_idx_pb.SerializeToString(&idx_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize tablet index value key=" << hex(tablet_idx_keys[j])
<< " txn_id=" << txn_id;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
txn->put(sub_tablet_idx_keys[j], idx_val);
LOG(INFO) << " repair tablet index txn_id=" << txn_id
<< " tablet_idx_pb:" << tablet_idx_pb.ShortDebugString()
<< " key=" << hex(sub_tablet_idx_keys[j]);
if (is_versioned_write) {
std::string versioned_tablet_idx_key =
versioned::tablet_index_key({instance_id, tablet_idx_pb.tablet_id()});
std::string versioned_tablet_inverted_idx_key =
versioned::tablet_inverted_index_key(
{instance_id, db_id, tablet_idx_pb.table_id(),
tablet_idx_pb.index_id(), tablet_idx_pb.partition_id(),
tablet_idx_pb.tablet_id()});
txn->put(versioned_tablet_idx_key, idx_val);
txn->put(versioned_tablet_inverted_idx_key, "");
LOG(INFO) << "repair tablet index and inverted index, txn_id=" << txn_id
<< " tablet_id=" << tablet_idx_pb.tablet_id()
<< " index_key=" << hex(versioned_tablet_idx_key)
<< " inverted_index_key=" << hex(versioned_tablet_inverted_idx_key);
}
}
}
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::COMMIT>(err);
ss << "failed to commit kv txn, txn_id=" << txn_id << " err=" << err;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
}
code = MetaServiceCode::OK;
}
void MetaServiceImpl::commit_txn_eventually(
const CommitTxnRequest* request, CommitTxnResponse* response, MetaServiceCode& code,
std::string& msg, const std::string& instance_id, int64_t db_id,
const std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>& tmp_rowsets_meta,
KVStats& stats) {
StopWatch sw;
DORIS_CLOUD_DEFER {
if (config::use_detailed_metrics && !instance_id.empty()) {
g_bvar_ms_commit_txn_eventually.put(instance_id, sw.elapsed_us());
}
};
std::stringstream ss;
TxnErrorCode err = TxnErrorCode::TXN_OK;
int64_t txn_id = request->txn_id();
bool is_versioned_write = is_version_write_enabled(instance_id);
bool is_versioned_read = is_version_read_enabled(instance_id);
do {
TEST_SYNC_POINT_CALLBACK("commit_txn_eventually:begin", &txn_id);
int64_t last_pending_txn_id = 0;
std::unique_ptr<Transaction> txn;
err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
ss << "failed to create txn, txn_id=" << txn_id << " err=" << err;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
DORIS_CLOUD_DEFER {
if (txn == nullptr) return;
stats.get_bytes += txn->get_bytes();
stats.put_bytes += txn->put_bytes();
stats.del_bytes += txn->delete_bytes();
stats.get_counter += txn->num_get_keys();
stats.put_counter += txn->num_put_keys();
stats.del_counter += txn->num_del_keys();
};
CloneChainReader meta_reader(instance_id, resource_mgr_.get());
AnnotateTag txn_tag("txn_id", txn_id);
// tablet_id -> {table/index/partition}_id
std::unordered_map<int64_t, TabletIndexPB> tablet_ids;
auto acquired_tablet_ids = to_container<std::vector<int64_t>>(
std::ranges::ref_view(tmp_rowsets_meta) |
std::ranges::views::transform(
[](const auto& pair) { return pair.second.tablet_id(); }));
if (!is_versioned_read) {
std::tie(code, msg) =
get_tablet_indexes(txn.get(), &tablet_ids, instance_id, acquired_tablet_ids);
if (code != MetaServiceCode::OK) {
return;
}
} else {
err = meta_reader.get_tablet_indexes(txn.get(), acquired_tablet_ids, &tablet_ids);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to get tablet indexes, err={}", err);
LOG_WARNING(msg);
return;
}
}
bool need_repair_tablet_idx =
std::any_of(tablet_ids.begin(), tablet_ids.end(),
[](const auto& pair) { return !pair.second.has_db_id(); });
TEST_SYNC_POINT_CALLBACK("commit_txn_eventually::need_repair_tablet_idx",
&need_repair_tablet_idx);
if (need_repair_tablet_idx) {
stats.get_bytes += txn->get_bytes();
stats.get_counter += txn->num_get_keys();
txn.reset();
repair_tablet_index(txn_kv_, code, msg, instance_id, db_id, txn_id, tmp_rowsets_meta,
is_versioned_write);
if (code != MetaServiceCode::OK) {
LOG(WARNING) << "repair_tablet_index failed, txn_id=" << txn_id << " code=" << code;
return;
}
continue;
}
CommitTxnLogPB commit_txn_log;
commit_txn_log.set_txn_id(txn_id);
commit_txn_log.set_db_id(db_id);
// <partition_version_key, version>
std::unordered_map<int64_t, std::tuple<int64_t, int64_t>> partition_indexes;
for (auto& [_, i] : tmp_rowsets_meta) {
int64_t tablet_id = i.tablet_id();
int64_t table_id = tablet_ids[tablet_id].table_id();
int64_t partition_id = i.partition_id();
partition_indexes.insert({partition_id, {db_id, table_id}});
commit_txn_log.mutable_tablet_to_partition_map()->insert({tablet_id, partition_id});
}
std::unordered_map<int64_t, int64_t> versions;
if (!is_versioned_read) {
std::tie(code, msg) = get_partition_versions(txn.get(), &versions, &last_pending_txn_id,
instance_id, partition_indexes);
if (code != MetaServiceCode::OK) {
return;
}
} else {
std::vector<int64_t> partition_ids = to_container<std::vector<int64_t>>(
std::ranges::ref_view(partition_indexes) | std::ranges::views::keys);
err = meta_reader.get_partition_versions(txn.get(), partition_ids, &versions,
&last_pending_txn_id);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to get partition versions, err={}", err);
LOG_WARNING(msg);
return;
}
}
TEST_SYNC_POINT_CALLBACK("commit_txn_eventually::last_pending_txn_id",
&last_pending_txn_id);
if (last_pending_txn_id > 0) {
TEST_SYNC_POINT_CALLBACK("commit_txn_eventually::advance_last_pending_txn_id",
&last_pending_txn_id);
stats.get_bytes += txn->get_bytes();
stats.get_counter += txn->num_get_keys();
txn.reset();
std::shared_ptr<TxnLazyCommitTask> task =
txn_lazy_committer_->submit(instance_id, last_pending_txn_id);
std::tie(code, msg) = task->wait();
if (code != MetaServiceCode::OK) {
LOG(WARNING) << "advance_last_txn failed last_txn=" << last_pending_txn_id
<< " code=" << code << " msg=" << msg;
return;
}
last_pending_txn_id = 0;
// there maybe concurrent commit_txn_eventually, so we need continue to make sure
// partition versionPB has no txn_id
continue;
}
record_txn_commit_stats(txn.get(), instance_id, partition_indexes.size(), tablet_ids.size(),
txn_id);
std::string info_val;
const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
err = txn->get(info_key, &info_val);
if (err != TxnErrorCode::TXN_OK) {
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND
: cast_as<ErrCategory::READ>(err);
ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << txn_id
<< " err=" << err;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
TxnInfoPB txn_info;
if (!txn_info.ParseFromString(info_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" << txn_id;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
LOG(INFO) << "txn_id=" << txn_id << " txn_info=" << txn_info.ShortDebugString();
DCHECK(txn_info.txn_id() == txn_id);
if (txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) {
code = MetaServiceCode::TXN_ALREADY_ABORTED;
ss << "transaction [" << txn_id << "] is already aborted, db_id=" << db_id;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
if (txn_info.status() == TxnStatusPB::TXN_STATUS_VISIBLE) {
if (request->has_is_2pc() && request->is_2pc()) {
code = MetaServiceCode::TXN_ALREADY_VISIBLE;
ss << "transaction [" << txn_id << "] is already visible, not pre-committed.";
msg = ss.str();
LOG(INFO) << msg;
response->mutable_txn_info()->CopyFrom(txn_info);
return;
}
code = MetaServiceCode::OK;
ss << "transaction is already visible: db_id=" << db_id << " txn_id=" << txn_id;
msg = ss.str();
LOG(INFO) << msg;
response->mutable_txn_info()->CopyFrom(txn_info);
return;
}
if (request->has_is_2pc() && request->is_2pc() &&
txn_info.status() == TxnStatusPB::TXN_STATUS_PREPARED) {
code = MetaServiceCode::TXN_INVALID_STATUS;
ss << "transaction is prepare, not pre-committed: db_id=" << db_id << " txn_id"
<< txn_id;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
auto now_time = system_clock::now();
uint64_t commit_time = duration_cast<milliseconds>(now_time.time_since_epoch()).count();
if ((txn_info.prepare_time() + txn_info.timeout_ms()) < commit_time) {
code = MetaServiceCode::UNDEFINED_ERR;
msg = fmt::format("txn is expired, not allow to commit txn_id={}", txn_id);
LOG(INFO) << msg << " prepare_time=" << txn_info.prepare_time()
<< " timeout_ms=" << txn_info.timeout_ms() << " commit_time=" << commit_time;
return;
}
txn_info.set_commit_time(commit_time);
txn_info.set_finish_time(commit_time);
if (request->has_commit_attachment()) {
txn_info.mutable_commit_attachment()->CopyFrom(request->commit_attachment());
}
DCHECK(txn_info.status() != TxnStatusPB::TXN_STATUS_COMMITTED);
// set status TXN_STATUS_COMMITTED not TXN_STATUS_VISIBLE !!!
// lazy commit task will advance txn to make txn visible
txn_info.set_status(TxnStatusPB::TXN_STATUS_COMMITTED);
txn_info.set_versioned_write(is_versioned_write);
txn_info.set_versioned_read(is_versioned_read);
LOG(INFO) << "after update txn_id= " << txn_id
<< " txn_info=" << txn_info.ShortDebugString();
info_val.clear();
if (!txn_info.SerializeToString(&info_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize txn_info when saving, txn_id=" << txn_id;
msg = ss.str();
return;
}
txn->put(info_key, info_val);
LOG(INFO) << "put info_key=" << hex(info_key) << " txn_id=" << txn_id;
if (txn_info.load_job_source_type() ==
LoadJobSourceTypePB::LOAD_JOB_SRC_TYPE_ROUTINE_LOAD_TASK) {
put_routine_load_progress(code, msg, instance_id, request, txn.get(), db_id);
}
if (txn_info.load_job_source_type() ==
LoadJobSourceTypePB::LOAD_JOB_SRC_TYPE_STREAMING_JOB) {
update_streaming_job_meta(code, msg, instance_id, request, txn.get(), db_id);
if (code != MetaServiceCode::OK) {
LOG(WARNING) << "update_streaming_job_meta failed, txn_id=" << txn_id
<< " code=" << code << " msg=" << msg;
return;
}
}
// save versions for partition
int64_t version_update_time_ms =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
response->set_version_update_time_ms(version_update_time_ms);
for (auto& [partition_id, version] : versions) {
std::string ver_val;
VersionPB version_pb;
version_pb.add_pending_txn_ids(txn_id);
version_pb.set_update_time_ms(version_update_time_ms);
if (version > 1) {
version_pb.set_version(version);
}
if (!version_pb.SerializeToString(&ver_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize version_pb when saving, txn_id=" << txn_id
<< " partition_id=" << partition_id;
msg = ss.str();
return;
}
auto [db_id, table_id] = partition_indexes[partition_id];
std::string version_key =
partition_version_key({instance_id, db_id, table_id, partition_id});
txn->put(version_key, ver_val);
LOG(INFO) << "put partition_version_key=" << hex(version_key) << " version:" << version
<< " txn_id=" << txn_id << " partition_id=" << partition_id
<< " update_time=" << version_update_time_ms;
VLOG_DEBUG << "txn_id=" << txn_id << " table_id=" << table_id
<< " partition_id=" << partition_id << " version=" << version;
if (is_versioned_write) {
std::string partition_version_key =
versioned::partition_version_key({instance_id, partition_id});
versioned_put(txn.get(), partition_version_key, ver_val);
LOG(INFO) << "put versioned partition key=" << hex(partition_version_key)
<< ", txn_id=" << txn_id;
}
int64_t new_version = version + 1;
commit_txn_log.mutable_partition_version_map()->insert({partition_id, new_version});
response->add_table_ids(table_id);
response->add_partition_ids(partition_id);
response->add_versions(new_version);
}
// table_id -> tablets_ids
std::unordered_map<int64_t, std::vector<int64_t>> table_id_tablet_ids;
for (auto& [tablet_id, tablet_idx] : tablet_ids) {
table_id_tablet_ids[tablet_idx.table_id()].push_back(tablet_id);
}
process_mow_when_commit_txn(request, instance_id, code, msg, txn, table_id_tablet_ids);
if (code != MetaServiceCode::OK) {
LOG(WARNING) << "process mow failed, txn_id=" << txn_id << " code=" << code;
return;
}
// Save table versions
for (auto& i : table_id_tablet_ids) {
if (is_versioned_read) {
// Read the table version, to build the operation log visible version range.
err = meta_reader.get_table_version(txn.get(), i.first, nullptr, true);
if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
code = cast_as<ErrCategory::READ>(err);
ss << "failed to get table version, err=" << err << " table_id=" << i.first;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
}
update_table_version(txn.get(), instance_id, db_id, i.first);
commit_txn_log.add_table_ids(i.first);
}
if (is_versioned_write) {
RecycleTxnPB* recycle_txn = commit_txn_log.mutable_recycle_txn();
recycle_txn->set_label(txn_info.label());
std::string log_key = versioned::log_key({instance_id});
OperationLogPB operation_log;
if (is_versioned_read) {
operation_log.set_min_timestamp(meta_reader.min_read_version());
}
operation_log.mutable_commit_txn()->Swap(&commit_txn_log);
versioned::blob_put(txn.get(), log_key, operation_log);
LOG(INFO) << "put commit txn operation log, key=" << hex(log_key)
<< " txn_id=" << txn_id;
}
LOG(INFO) << "put_size=" << txn->put_bytes() << " del_size=" << txn->delete_bytes()
<< " num_put_keys=" << txn->num_put_keys()
<< " num_del_keys=" << txn->num_del_keys()
<< " txn_size=" << txn->approximate_bytes() << " txn_id=" << txn_id;
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
if (err == TxnErrorCode::TXN_CONFLICT) {
g_bvar_delete_bitmap_lock_txn_remove_conflict_by_load_counter << 1;
}
code = cast_as<ErrCategory::COMMIT>(err);
ss << "failed to commit kv txn, txn_id=" << txn_id << " err=" << err;
msg = ss.str();
return;
}
TEST_SYNC_POINT_RETURN_WITH_VOID("commit_txn_eventually::txn_lazy_committer_submit",
&txn_id);
std::shared_ptr<TxnLazyCommitTask> task = txn_lazy_committer_->submit(instance_id, txn_id);
TEST_SYNC_POINT_CALLBACK("commit_txn_eventually::txn_lazy_committer_wait", &txn_id);
std::pair<MetaServiceCode, std::string> ret = task->wait();
TEST_SYNC_POINT_CALLBACK("commit_txn_eventually::task->wait", &ret);
if (ret.first != MetaServiceCode::OK) {
LOG(WARNING) << "txn lazy commit failed txn_id=" << txn_id << " code=" << ret.first
<< " msg=" << ret.second;
}
std::unordered_map<int64_t, TabletStats> tablet_stats; // tablet_id -> stats
for (auto& [_, i] : tmp_rowsets_meta) {
// Accumulate affected rows
auto& stats = tablet_stats[i.tablet_id()];
stats.data_size += i.total_disk_size();
stats.num_rows += i.num_rows();
++stats.num_rowsets;
stats.num_segs += i.num_segments();
stats.index_size += i.index_disk_size();
stats.segment_size += i.data_disk_size();
}
// calculate table stats from tablets stats
std::map<int64_t /*table_id*/, TableStats> table_stats;
std::vector<int64_t> base_tablet_ids(request->base_tablet_ids().begin(),
request->base_tablet_ids().end());
calc_table_stats(tablet_ids, tablet_stats, table_stats, base_tablet_ids);
for (const auto& pair : table_stats) {
TableStatsPB* stats_pb = response->add_table_stats();
auto table_id = pair.first;
auto stats = pair.second;
get_pb_from_tablestats(stats, stats_pb);
stats_pb->set_table_id(table_id);
VLOG_DEBUG << "Add TableStats to CommitTxnResponse. txn_id=" << txn_id
<< " table_id=" << table_id
<< " updated_row_count=" << stats_pb->updated_row_count();
}
// txn set visible for fe callback
txn_info.set_status(TxnStatusPB::TXN_STATUS_VISIBLE);
response->mutable_txn_info()->CopyFrom(txn_info);
TEST_SYNC_POINT_CALLBACK("commit_txn_eventually::finish", &code, &txn_id);
break;
} while (true);
}
/**
* This process is generally the same as commit_txn, the difference is that
* the partitions version will plus 1 in multi sub txns.
*
* One example:
* Suppose the table, partition, tablet and version info is:
* --------------------------------------------
* | table | partition | tablet | version |
* --------------------------------------------
* | t1 | t1_p1 | t1_p1.1 | 1 |
* | t1 | t1_p1 | t1_p1.2 | 1 |
* | t1 | t1_p2 | t1_p2.1 | 2 |
* | t2 | t2_p3 | t2_p3.1 | 3 |
* | t2 | t2_p4 | t2_p4.1 | 4 |
* --------------------------------------------
*
* Now we commit a txn with 3 sub txns and the tablets are:
* sub_txn1: t1_p1.1, t1_p1.2, t1_p2.1
* sub_txn2: t2_p3.1
* sub_txn3: t1_p1.1, t1_p1.2
* When commit, the partitions version will be:
* sub_txn1: t1_p1(1 -> 2), t1_p2(2 -> 3)
* sub_txn2: t2_p3(3 -> 4)
* sub_txn3: t1_p1(2 -> 3)
* After commit, the partitions version will be:
* t1: t1_p1(3), t1_p2(3)
* t2: t2_p3(4), t2_p4(4)
*/
void MetaServiceImpl::commit_txn_with_sub_txn(const CommitTxnRequest* request,
CommitTxnResponse* response, MetaServiceCode& code,
std::string& msg, const std::string& instance_id,
int64_t db_id, KVStats& stats) {
std::stringstream ss;
int64_t txn_id = request->txn_id();
auto sub_txn_infos = request->sub_txn_infos();
std::map<int64_t, std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>>
sub_txn_to_tmp_rowsets_meta;
for (const auto& sub_txn_info : sub_txn_infos) {
auto sub_txn_id = sub_txn_info.sub_txn_id();
std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>> tmp_rowsets_meta;
scan_tmp_rowset(instance_id, sub_txn_id, txn_kv_, code, msg, &tmp_rowsets_meta, &stats);
if (code != MetaServiceCode::OK) {
LOG(WARNING) << "scan_tmp_rowset failed, txn_id=" << txn_id
<< ", sub_txn_id=" << sub_txn_id << " code=" << code;
return;
}
sub_txn_to_tmp_rowsets_meta.emplace(sub_txn_id, std::move(tmp_rowsets_meta));
}
do {
TEST_SYNC_POINT_CALLBACK("commit_txn_with_sub_txn:begin", &txn_id);
// Create a readonly txn for scan tmp rowset
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
ss << "filed to create txn, txn_id=" << txn_id << " err=" << err;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
DORIS_CLOUD_DEFER {
if (txn == nullptr) return;
stats.get_bytes += txn->get_bytes();
stats.put_bytes += txn->put_bytes();
stats.del_bytes += txn->delete_bytes();
stats.get_counter += txn->num_get_keys();
stats.put_counter += txn->num_put_keys();
stats.del_counter += txn->num_del_keys();
};
// Get txn info with db_id and txn_id
std::string info_val; // Will be reused when saving updated txn
const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
err = txn->get(info_key, &info_val);
if (err != TxnErrorCode::TXN_OK) {
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND
: cast_as<ErrCategory::READ>(err);
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
ss << "transaction [" << txn_id << "] not found, db_id=" << db_id;
} else {
ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << txn_id
<< " err=" << err;
}
msg = ss.str();
LOG(WARNING) << msg;
return;
}
TxnInfoPB txn_info;
if (!txn_info.ParseFromString(info_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" << txn_id;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
// TODO: do more check like txn state
DCHECK(txn_info.txn_id() == txn_id);
if (txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) {
code = MetaServiceCode::TXN_ALREADY_ABORTED;
ss << "transaction is already aborted: db_id=" << db_id << " txn_id=" << txn_id;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
if (txn_info.status() == TxnStatusPB::TXN_STATUS_VISIBLE) {
code = MetaServiceCode::OK;
ss << "transaction is already visible: db_id=" << db_id << " txn_id=" << txn_id;
msg = ss.str();
LOG(INFO) << msg;
response->mutable_txn_info()->CopyFrom(txn_info);
return;
}
LOG(INFO) << "txn_id=" << txn_id << " txn_info=" << txn_info.ShortDebugString();
AnnotateTag txn_tag("txn_id", txn_id);
bool is_versioned_write = is_version_write_enabled(instance_id);
bool is_versioned_read = is_version_read_enabled(instance_id);
CloneChainReader meta_reader(instance_id, resource_mgr_.get());
// Prepare rowset meta and new_versions
std::unordered_map<int64_t, TabletIndexPB> tablet_ids;
std::vector<int64_t> acquired_tablet_ids;
for (const auto& [_, tmp_rowsets_meta] : sub_txn_to_tmp_rowsets_meta) {
for (const auto& [_, i] : tmp_rowsets_meta) {
acquired_tablet_ids.push_back(i.tablet_id());
}
}
if (!is_versioned_read) {
// Read tablet indexes in batch.
std::tie(code, msg) =
get_tablet_indexes(txn.get(), &tablet_ids, instance_id, acquired_tablet_ids);
if (code != MetaServiceCode::OK) {
return;
}
} else {
TxnErrorCode err =
meta_reader.get_tablet_indexes(txn.get(), acquired_tablet_ids, &tablet_ids);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to get tablet indexes, err={}", err);
LOG_WARNING(msg);
return;
}
}
// {table/partition} -> version
std::unordered_map<int64_t, int64_t> new_versions;
std::unordered_map<int64_t, std::tuple<int64_t, int64_t>> partition_indexes;
for (auto& [tablet_id, tablet_idx] : tablet_ids) {
int64_t table_id = tablet_idx.table_id();
int64_t partition_id = tablet_idx.partition_id();
partition_indexes.insert({partition_id, {db_id, table_id}});
}
int64_t last_pending_txn_id = 0;
if (!is_versioned_read) {
std::tie(code, msg) = get_partition_versions(
txn.get(), &new_versions, &last_pending_txn_id, instance_id, partition_indexes);
if (code != MetaServiceCode::OK) {
return;
}
} else {
std::vector<int64_t> partition_ids = to_container<std::vector<int64_t>>(
std::ranges::ref_view(partition_indexes) | std::ranges::views::keys);
err = meta_reader.get_partition_versions(txn.get(), partition_ids, &new_versions,
&last_pending_txn_id);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to get partition versions, err={}", err);
LOG_WARNING(msg);
return;
}
}
if (last_pending_txn_id > 0) {
stats.get_bytes += txn->get_bytes();
stats.get_counter += txn->num_get_keys();
txn.reset();
TEST_SYNC_POINT_CALLBACK("commit_txn_with_sub_txn::advance_last_pending_txn_id",
&last_pending_txn_id);
std::shared_ptr<TxnLazyCommitTask> task =
txn_lazy_committer_->submit(instance_id, last_pending_txn_id);
std::tie(code, msg) = task->wait();
if (code != MetaServiceCode::OK) {
LOG(WARNING) << "advance_last_txn failed last_txn=" << last_pending_txn_id
<< " code=" << code << " msg=" << msg;
return;
}
last_pending_txn_id = 0;
continue;
}
CommitTxnLogPB commit_txn_log;
commit_txn_log.set_txn_id(txn_id);
commit_txn_log.set_db_id(db_id);
int64_t rowsets_visible_ts_ms =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
// <tablet_id, version> -> rowset meta
std::vector<std::pair<std::tuple<int64_t, int64_t>, RowsetMetaCloudPB>> rowsets;
std::unordered_map<int64_t, TabletStats> tablet_stats; // tablet_id -> stats
rowsets.reserve(sub_txn_to_tmp_rowsets_meta.size() * 10); // rough estimate
for (const auto& sub_txn_info : sub_txn_infos) {
auto sub_txn_id = sub_txn_info.sub_txn_id();
auto tmp_rowsets_meta = sub_txn_to_tmp_rowsets_meta[sub_txn_id];
std::unordered_map<int64_t, int64_t> partition_id_to_version;
for (auto& [_, i] : tmp_rowsets_meta) {
int64_t tablet_id = i.tablet_id();
int64_t table_id = tablet_ids[tablet_id].table_id();
int64_t partition_id = i.partition_id();
if (new_versions.count(partition_id) == 0) [[unlikely]] {
// it is impossible.
code = MetaServiceCode::UNDEFINED_ERR;
ss << "failed to get partition version key, the target version not exists in "
"new_versions."
<< " txn_id=" << txn_id << ", db_id=" << db_id << ", table_id=" << table_id
<< ", partition_id=" << partition_id;
msg = ss.str();
LOG(ERROR) << msg;
return;
}
// Update rowset version
int64_t new_version = new_versions[partition_id];
if (partition_id_to_version.count(partition_id) == 0) {
new_versions[partition_id] = new_version + 1;
new_version = new_versions[partition_id];
partition_id_to_version[partition_id] = new_version;
}
i.set_start_version(new_version);
i.set_end_version(new_version);
i.set_visible_ts_ms(rowsets_visible_ts_ms);
LOG(INFO) << "xxx update rowset version, txn_id=" << txn_id
<< ", sub_txn_id=" << sub_txn_id << ", table_id=" << table_id
<< ", partition_id=" << partition_id << ", tablet_id=" << tablet_id
<< ", new_version=" << new_version;
// Accumulate affected rows
auto& stats = tablet_stats[tablet_id];
stats.data_size += i.total_disk_size();
stats.num_rows += i.num_rows();
++stats.num_rowsets;
stats.num_segs += i.num_segments();
stats.index_size += i.index_disk_size();
stats.segment_size += i.data_disk_size();
rowsets.emplace_back(std::make_tuple(tablet_id, i.end_version()), std::move(i));
commit_txn_log.mutable_tablet_to_partition_map()->insert({tablet_id, partition_id});
} // for tmp_rowsets_meta
}
std::unordered_map<int64_t, std::vector<int64_t>> table_id_tablet_ids;
for (auto& [tablet_id, tablet_idx] : tablet_ids) {
int64_t table_id = tablet_idx.table_id();
table_id_tablet_ids[table_id].push_back(tablet_id);
}
process_mow_when_commit_txn(request, instance_id, code, msg, txn, table_id_tablet_ids);
if (code != MetaServiceCode::OK) {
LOG(WARNING) << "process mow failed, txn_id=" << txn_id << " code=" << code;
return;
}
// Save rowset meta
for (auto& i : rowsets) {
auto [tablet_id, version] = i.first;
std::string rowset_key = meta_rowset_key({instance_id, tablet_id, version});
std::string val;
if (!i.second.SerializeToString(&val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize rowset_meta, txn_id=" << txn_id;
msg = ss.str();
return;
}
size_t rowset_size = rowset_key.size() + val.size();
txn->put(rowset_key, val);
LOG(INFO) << "put rowset_key=" << hex(rowset_key) << " txn_id=" << txn_id
<< " rowset_size=" << rowset_size;
if (is_versioned_write) {
auto& rowset_meta = i.second;
std::string meta_rowset_key = versioned::meta_rowset_key(
{instance_id, rowset_meta.tablet_id(), rowset_meta.rowset_id_v2()});
if (config::enable_recycle_rowset_strip_key_bounds) {
doris::RowsetMetaCloudPB rowset_meta_copy = rowset_meta;
// Strip key bounds to shrink operation log for ts compaction recycle entries
rowset_meta_copy.clear_segments_key_bounds();
rowset_meta_copy.clear_segments_key_bounds_truncated();
blob_put(txn.get(), meta_rowset_key, rowset_meta_copy.SerializeAsString(), 0);
} else {
blob_put(txn.get(), meta_rowset_key, rowset_meta.SerializeAsString(), 0);
}
LOG(INFO) << "put versioned meta_rowset_key=" << hex(meta_rowset_key);
std::string versioned_rowset_key =
versioned::meta_rowset_load_key({instance_id, tablet_id, version});
if (!versioned::document_put(txn.get(), versioned_rowset_key,
std::move(i.second))) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to put versioned rowset meta, txn_id=" << txn_id
<< " key=" << hex(versioned_rowset_key);
msg = ss.str();
LOG(WARNING) << msg;
return;
}
LOG(INFO) << "put versioned rowset meta key=" << hex(versioned_rowset_key)
<< ", txn_id=" << txn_id;
}
}
// Save versions
for (auto& [partition_id, new_version] : new_versions) {
std::string ver_val;
VersionPB version_pb;
version_pb.set_version(new_version);
if (!version_pb.SerializeToString(&ver_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize version_pb when saving, txn_id=" << txn_id;
msg = ss.str();
return;
}
auto [db_id, table_id] = partition_indexes[partition_id];
std::string version_key =
partition_version_key({instance_id, db_id, table_id, partition_id});
txn->put(version_key, ver_val);
LOG(INFO) << "put partition_version_key=" << hex(version_key)
<< " version:" << new_version << " txn_id=" << txn_id
<< " partition_id=" << partition_id;
VLOG_DEBUG << "txn_id=" << txn_id << " table_id=" << table_id
<< " partition_id=" << partition_id << " version=" << new_version;
if (is_versioned_write) {
std::string partition_version_key =
versioned::partition_version_key({instance_id, partition_id});
versioned_put(txn.get(), partition_version_key, ver_val);
LOG(INFO) << "put versioned partition key=" << hex(partition_version_key)
<< ", txn_id=" << txn_id;
}
commit_txn_log.mutable_partition_version_map()->insert({partition_id, new_version});
response->add_table_ids(table_id);
response->add_partition_ids(partition_id);
response->add_versions(new_version);
}
// Save table versions
for (auto& i : table_id_tablet_ids) {
if (is_versioned_read) {
// Read the table version, to build the operation log visible version range.
TxnErrorCode err = meta_reader.get_table_version(txn.get(), i.first, nullptr, true);
if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
code = cast_as<ErrCategory::READ>(err);
ss << "failed to get table version, err=" << err << " table_id=" << i.first;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
}
update_table_version(txn.get(), instance_id, db_id, i.first);
commit_txn_log.add_table_ids(i.first);
}
LOG(INFO) << " before update txn_info=" << txn_info.ShortDebugString();
// Update txn_info
txn_info.set_status(TxnStatusPB::TXN_STATUS_VISIBLE);
auto now_time = system_clock::now();
uint64_t commit_time = duration_cast<milliseconds>(now_time.time_since_epoch()).count();
if ((txn_info.prepare_time() + txn_info.timeout_ms()) < commit_time) {
code = MetaServiceCode::UNDEFINED_ERR;
msg = fmt::format("txn is expired, not allow to commit txn_id={}", txn_id);
LOG(INFO) << msg << " prepare_time=" << txn_info.prepare_time()
<< " timeout_ms=" << txn_info.timeout_ms() << " commit_time=" << commit_time;
return;
}
txn_info.set_commit_time(commit_time);
txn_info.set_finish_time(commit_time);
if (request->has_commit_attachment()) {
txn_info.mutable_commit_attachment()->CopyFrom(request->commit_attachment());
}
txn_info.set_versioned_write(is_versioned_write);
txn_info.set_versioned_read(is_versioned_read);
LOG(INFO) << "after update txn_info=" << txn_info.ShortDebugString();
info_val.clear();
if (!txn_info.SerializeToString(&info_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize txn_info when saving, txn_id=" << txn_id;
msg = ss.str();
return;
}
txn->put(info_key, info_val);
LOG(INFO) << "xxx put info_key=" << hex(info_key) << " txn_id=" << txn_id;
// Batch get existing versioned tablet stats if needed
std::unordered_map<int64_t, TabletStatsPB> existing_versioned_stats;
if (is_versioned_write && !tablet_stats.empty()) {
internal_get_load_tablet_stats_batch(code, msg, meta_reader, txn.get(), instance_id,
tablet_ids, &existing_versioned_stats);
if (code != MetaServiceCode::OK) {
LOG(WARNING) << "batch get versioned tablet stats failed, code=" << code
<< " msg=" << msg << " txn_id=" << txn_id;
return;
}
LOG(INFO) << "batch get " << existing_versioned_stats.size()
<< " versioned tablet stats, txn_id=" << txn_id;
}
// Update stats of affected tablet
for (auto& [tablet_id, stats] : tablet_stats) {
DCHECK(tablet_ids.count(tablet_id));
auto& tablet_idx = tablet_ids[tablet_id];
StatsTabletKeyInfo info {instance_id, tablet_idx.table_id(), tablet_idx.index_id(),
tablet_idx.partition_id(), tablet_id};
update_tablet_stats(info, stats, txn, code, msg);
if (code != MetaServiceCode::OK) return;
if (is_versioned_write) {
TabletStatsPB stats_pb = existing_versioned_stats[tablet_id];
merge_tablet_stats(stats_pb, stats);
std::string stats_key = versioned::tablet_load_stats_key({instance_id, tablet_id});
if (!versioned::document_put(txn.get(), stats_key, std::move(stats_pb))) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
msg = "failed to serialize versioned tablet stats";
LOG(WARNING) << msg << " tablet_id=" << tablet_id << " txn_id=" << txn_id;
return;
}
LOG(INFO) << "put versioned tablet stats key=" << hex(stats_key)
<< " tablet_id=" << tablet_id << " txn_id=" << txn_id;
}
}
// Remove tmp rowset meta
for (auto& [_, tmp_rowsets_meta] : sub_txn_to_tmp_rowsets_meta) {
for (auto& [k, _] : tmp_rowsets_meta) {
txn->remove(k);
LOG(INFO) << "xxx remove tmp_rowset_key=" << hex(k) << " txn_id=" << txn_id;
}
}
const std::string running_key = txn_running_key({instance_id, db_id, txn_id});
LOG(INFO) << "xxx remove running_key=" << hex(running_key) << " txn_id=" << txn_id;
txn->remove(running_key);
std::string recycle_val;
std::string recycle_key = recycle_txn_key({instance_id, db_id, txn_id});
RecycleTxnPB recycle_pb;
recycle_pb.set_creation_time(commit_time);
recycle_pb.set_label(txn_info.label());
if (is_versioned_write) {
commit_txn_log.mutable_recycle_txn()->Swap(&recycle_pb);
std::string log_key = versioned::log_key({instance_id});
OperationLogPB operation_log;
if (is_versioned_read) {
operation_log.set_min_timestamp(meta_reader.min_read_version());
}
operation_log.mutable_commit_txn()->Swap(&commit_txn_log);
versioned::blob_put(txn.get(), log_key, operation_log);
LOG(INFO) << "put commit txn operation log key=" << hex(recycle_key)
<< " txn_id=" << txn_id;
} else {
if (!recycle_pb.SerializeToString(&recycle_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize recycle_pb, txn_id=" << txn_id;
msg = ss.str();
return;
}
txn->put(recycle_key, recycle_val);
LOG(INFO) << "commit_txn put recycle_txn_key key=" << hex(recycle_key)
<< " txn_id=" << txn_id;
}
LOG(INFO) << "commit_txn put_size=" << txn->put_bytes()
<< " del_size=" << txn->delete_bytes() << " num_put_keys=" << txn->num_put_keys()
<< " num_del_keys=" << txn->num_del_keys()
<< " txn_size=" << txn->approximate_bytes() << " txn_id=" << txn_id;
TEST_SYNC_POINT_RETURN_WITH_VOID("commit_txn_with_sub_txn::before_commit", &err, &code);
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
if (err == TxnErrorCode::TXN_CONFLICT) {
g_bvar_delete_bitmap_lock_txn_remove_conflict_by_load_counter << 1;
}
code = cast_as<ErrCategory::COMMIT>(err);
ss << "failed to commit kv txn with sub txn, txn_id=" << txn_id << " err=" << err;
msg = ss.str();
return;
}
// calculate table stats from tablets stats
std::map<int64_t /*table_id*/, TableStats> table_stats;
std::vector<int64_t> base_tablet_ids(request->base_tablet_ids().begin(),
request->base_tablet_ids().end());
calc_table_stats(tablet_ids, tablet_stats, table_stats, base_tablet_ids);
for (const auto& pair : table_stats) {
TableStatsPB* stats_pb = response->add_table_stats();
auto table_id = pair.first;
auto stats = pair.second;
get_pb_from_tablestats(stats, stats_pb);
stats_pb->set_table_id(table_id);
VLOG_DEBUG << "Add TableStats to CommitTxnResponse. txn_id=" << txn_id
<< " table_id=" << table_id
<< " updated_row_count=" << stats_pb->updated_row_count();
}
response->mutable_txn_info()->CopyFrom(txn_info);
TEST_SYNC_POINT_CALLBACK("commit_txn_with_sub_txn::finish", &code);
break;
} while (true);
} // end commit_txn_with_sub_txn
static bool force_txn_lazy_commit() {
static std::mt19937 rng(20250806 /* seed */);
static std::uniform_int_distribution<int> dist(1, 100);
return dist(rng) <= config::cloud_txn_lazy_commit_fuzzy_possibility;
}
void MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller,
const CommitTxnRequest* request, CommitTxnResponse* response,
::google::protobuf::Closure* done) {
RPC_PREPROCESS(commit_txn, get, put, del);
if (!request->has_txn_id()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "invalid argument, missing txn id";
return;
}
int64_t txn_id = request->txn_id();
std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty instance_id";
LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id << " txn_id=" << txn_id;
return;
}
RPC_RATE_LIMIT(commit_txn)
int64_t db_id;
get_txn_db_id(txn_kv_.get(), instance_id, txn_id, code, msg, &db_id, &stats);
if (code != MetaServiceCode::OK) {
LOG(WARNING) << "get_txn_db_id failed, txn_id=" << txn_id << " code=" << code;
return;
}
if (request->has_is_txn_load() && request->is_txn_load()) {
commit_txn_with_sub_txn(request, response, code, msg, instance_id, db_id, stats);
return;
}
std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>> tmp_rowsets_meta;
scan_tmp_rowset(instance_id, txn_id, txn_kv_, code, msg, &tmp_rowsets_meta, &stats);
if (code != MetaServiceCode::OK) {
LOG(WARNING) << "scan_tmp_rowset failed, txn_id=" << txn_id << " code=" << code;
return;
}
TxnErrorCode err = TxnErrorCode::TXN_OK;
bool enable_txn_lazy_commit_feature =
(request->has_is_2pc() && !request->is_2pc() && request->has_enable_txn_lazy_commit() &&
request->enable_txn_lazy_commit() && config::enable_cloud_txn_lazy_commit);
while ((!enable_txn_lazy_commit_feature ||
(tmp_rowsets_meta.size() <= config::txn_lazy_commit_rowsets_thresold))) {
if (force_txn_lazy_commit()) {
LOG(INFO) << "fuzzy test force_txn_lazy_commit, txn_id=" << txn_id
<< " force_posibility=" << config::cloud_txn_lazy_commit_fuzzy_possibility;
break;
}
commit_txn_immediately(request, response, code, msg, instance_id, db_id, tmp_rowsets_meta,
err, stats);
if (MetaServiceCode::OK == code) {
return;
}
if (TxnErrorCode::TXN_BYTES_TOO_LARGE != err) {
return;
}
if (!enable_txn_lazy_commit_feature) {
if (err == TxnErrorCode::TXN_BYTES_TOO_LARGE) {
msg += ", likely due to committing too many tablets. "
"Please reduce the number of partitions involved in the load.";
}
return;
}
DCHECK(code != MetaServiceCode::OK);
DCHECK(enable_txn_lazy_commit_feature);
DCHECK(err == TxnErrorCode::TXN_BYTES_TOO_LARGE);
LOG(INFO) << "txn_id=" << txn_id << " fallthrough commit_txn_eventually";
break;
}
LOG(INFO) << "txn_id=" << txn_id << " commit_txn_eventually"
<< " tmp_rowsets_meta.size=" << tmp_rowsets_meta.size();
code = MetaServiceCode::OK;
msg.clear();
commit_txn_eventually(request, response, code, msg, instance_id, db_id, tmp_rowsets_meta,
stats);
}
static void _abort_txn(const std::string& instance_id, const AbortTxnRequest* request,
Transaction* txn, TxnInfoPB& return_txn_info, std::stringstream& ss,
MetaServiceCode& code, std::string& msg) {
int64_t txn_id = request->txn_id();
std::string label = request->label();
int64_t db_id = request->db_id();
std::string info_key; // Will be used when saving updated txn
std::string info_val; // Will be reused when saving updated txn
TxnErrorCode err;
//TODO: split with two function.
//there two ways to abort txn:
//1. abort txn by txn id
//2. abort txn by label and db_id
if (txn_id > 0) {
VLOG_DEBUG << "abort_txn by txn_id, txn_id=" << txn_id;
//abort txn by txn id
// Get db id with txn id
std::string index_key;
std::string index_val;
//not provide db_id, we need read from disk.
if (db_id == 0) {
index_key = txn_index_key({instance_id, txn_id});
err = txn->get(index_key, &index_val);
if (err != TxnErrorCode::TXN_OK) {
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND
: cast_as<ErrCategory::READ>(err);
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
ss << "transaction [" << txn_id << "] not found";
} else {
ss << "failed to get txn info, txn_id=" << txn_id << " err=" << err;
}
msg = ss.str();
return;
}
TxnIndexPB index_pb;
if (!index_pb.ParseFromString(index_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "failed to parse txn_index_val"
<< " txn_id=" << txn_id;
msg = ss.str();
return;
}
DCHECK(index_pb.has_tablet_index() == true);
DCHECK(index_pb.tablet_index().has_db_id() == true);
db_id = index_pb.tablet_index().db_id();
}
// Get txn info with db_id and txn_id
info_key = txn_info_key({instance_id, db_id, txn_id});
err = txn->get(info_key, &info_val);
if (err != TxnErrorCode::TXN_OK) {
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND
: cast_as<ErrCategory::READ>(err);
ss << "failed to get txn_info, db_id=" << db_id << "txn_id=" << txn_id << "err=" << err;
msg = ss.str();
return;
}
if (!return_txn_info.ParseFromString(info_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "failed to parse txn_info db_id=" << db_id << "txn_id=" << txn_id;
msg = ss.str();
return;
}
DCHECK(return_txn_info.txn_id() == txn_id);
//check state is valid.
if (return_txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) {
code = MetaServiceCode::TXN_ALREADY_ABORTED;
ss << "transaction [" << txn_id << "] is already aborted, db_id=" << db_id;
msg = ss.str();
return;
}
if (return_txn_info.status() == TxnStatusPB::TXN_STATUS_VISIBLE) {
code = MetaServiceCode::TXN_ALREADY_VISIBLE;
ss << "transaction [" << txn_id << "] is already VISIBLE, db_id=" << db_id;
msg = ss.str();
return;
}
} else {
VLOG_DEBUG << "abort_txn db_id and label, db_id=" << db_id << " label=" << label;
//abort txn by label.
std::string label_key = txn_label_key({instance_id, db_id, label});
std::string label_val;
err = txn->get(label_key, &label_val);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
ss << "txn->get() failed, label=" << label << " err=" << err;
msg = ss.str();
return;
}
//label index not exist
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
code = MetaServiceCode::TXN_LABEL_NOT_FOUND;
ss << "label not found, db_id=" << db_id << " label=" << label << " err=" << err;
msg = ss.str();
return;
}
TxnLabelPB label_pb;
DCHECK(label_val.size() > VERSION_STAMP_LEN);
if (!label_pb.ParseFromArray(label_val.data(), label_val.size() - VERSION_STAMP_LEN)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "txn_label_pb->ParseFromString() failed, label=" << label;
msg = ss.str();
return;
}
int64_t prepare_txn_id = 0;
//found prepare state txn for abort
for (auto& cur_txn_id : label_pb.txn_ids()) {
std::string cur_info_key = txn_info_key({instance_id, db_id, cur_txn_id});
std::string cur_info_val;
err = txn->get(cur_info_key, &cur_info_val);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
std::stringstream ss;
ss << "txn->get() failed, cur_txn_id=" << cur_txn_id << " err=" << err;
msg = ss.str();
return;
}
// ret == 0
TxnInfoPB cur_txn_info;
if (!cur_txn_info.ParseFromString(cur_info_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
std::stringstream ss;
ss << "cur_txn_info->ParseFromString() failed, cur_txn_id=" << cur_txn_id;
msg = ss.str();
return;
}
VLOG_DEBUG << "cur_txn_info=" << cur_txn_info.ShortDebugString();
//TODO: 2pc else need to check TxnStatusPB::TXN_STATUS_PRECOMMITTED
if ((cur_txn_info.status() == TxnStatusPB::TXN_STATUS_PREPARED) ||
(cur_txn_info.status() == TxnStatusPB::TXN_STATUS_PRECOMMITTED)) {
prepare_txn_id = cur_txn_id;
return_txn_info = std::move(cur_txn_info);
info_key = std::move(cur_info_key);
DCHECK_EQ(prepare_txn_id, return_txn_info.txn_id())
<< "prepare_txn_id=" << prepare_txn_id
<< " txn_id=" << return_txn_info.txn_id();
break;
}
}
if (prepare_txn_id == 0) {
code = MetaServiceCode::TXN_INVALID_STATUS;
std::stringstream ss;
ss << "running transaction not found, db_id=" << db_id << " label=" << label;
msg = ss.str();
return;
}
}
auto now_time = system_clock::now();
uint64_t finish_time = duration_cast<milliseconds>(now_time.time_since_epoch()).count();
// Update txn_info
return_txn_info.set_status(TxnStatusPB::TXN_STATUS_ABORTED);
return_txn_info.set_finish_time(finish_time);
request->has_reason() ? return_txn_info.set_reason(request->reason())
: return_txn_info.set_reason("User Abort");
if (request->has_commit_attachment()) {
return_txn_info.mutable_commit_attachment()->CopyFrom(request->commit_attachment());
}
info_val.clear();
if (!return_txn_info.SerializeToString(&info_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize txn_info when saving, txn_id=" << return_txn_info.txn_id();
msg = ss.str();
return;
}
LOG(INFO) << "check watermark conflict, txn_info=" << return_txn_info.ShortDebugString();
txn->put(info_key, info_val);
LOG(INFO) << "xxx put info_key=" << hex(info_key) << " txn_id=" << return_txn_info.txn_id();
std::string running_key = txn_running_key({instance_id, db_id, return_txn_info.txn_id()});
txn->remove(running_key);
LOG(INFO) << "xxx remove running_key=" << hex(running_key)
<< " txn_id=" << return_txn_info.txn_id();
std::string recycle_key = recycle_txn_key({instance_id, db_id, return_txn_info.txn_id()});
std::string recycle_val;
RecycleTxnPB recycle_pb;
recycle_pb.set_creation_time(finish_time);
recycle_pb.set_label(return_txn_info.label());
if (!recycle_pb.SerializeToString(&recycle_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize recycle_pb, txn_id=" << return_txn_info.txn_id();
msg = ss.str();
return;
}
txn->put(recycle_key, recycle_val);
LOG(INFO) << "put recycle_txn_key=" << hex(recycle_key)
<< " txn_id=" << return_txn_info.txn_id();
}
void MetaServiceImpl::abort_txn(::google::protobuf::RpcController* controller,
const AbortTxnRequest* request, AbortTxnResponse* response,
::google::protobuf::Closure* done) {
RPC_PREPROCESS(abort_txn, get, put, del);
// Get txn id
int64_t txn_id = request->has_txn_id() ? request->txn_id() : -1;
std::string label = request->has_label() ? request->label() : "";
int64_t db_id = request->has_db_id() ? request->db_id() : -1;
if (txn_id < 0 && (label.empty() || db_id < 0)) {
code = MetaServiceCode::INVALID_ARGUMENT;
ss << "invalid txn id and label, db_id=" << db_id << " txn_id=" << txn_id
<< " label=" << label;
msg = ss.str();
return;
}
std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
ss << "cannot find instance_id with cloud_unique_id="
<< (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id) << " label=" << label
<< " txn_id=" << txn_id;
msg = ss.str();
return;
}
RPC_RATE_LIMIT(abort_txn);
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
ss << "filed to txn_kv_->create_txn(), txn_id=" << txn_id << " label=" << label
<< " err=" << err;
msg = ss.str();
return;
}
TxnInfoPB txn_info;
_abort_txn(instance_id, request, txn.get(), txn_info, ss, code, msg);
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::COMMIT>(err);
ss << "failed to commit kv txn, txn_id=" << txn_info.txn_id() << " err=" << err;
msg = ss.str();
return;
}
response->mutable_txn_info()->CopyFrom(txn_info);
}
void MetaServiceImpl::get_txn(::google::protobuf::RpcController* controller,
const GetTxnRequest* request, GetTxnResponse* response,
::google::protobuf::Closure* done) {
RPC_PREPROCESS(get_txn, get);
int64_t txn_id = request->has_txn_id() ? request->txn_id() : -1;
int64_t db_id = request->has_db_id() ? request->db_id() : -1;
std::string label = request->has_label() ? request->label() : "";
if (txn_id < 0 && label.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
ss << "invalid txn_id, it may be not given or set properly, txn_id=" << txn_id;
msg = ss.str();
return;
}
std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
ss << "cannot find instance_id with cloud_unique_id="
<< (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id);
msg = ss.str();
return;
}
RPC_RATE_LIMIT(get_txn)
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
ss << "failed to create txn, txn_id=" << txn_id << " err=" << err;
msg = ss.str();
return;
}
if (!label.empty()) {
//step 1: get label
const std::string label_key = txn_label_key({instance_id, db_id, label});
std::string label_val;
err = txn->get(label_key, &label_val);
if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
code = cast_as<ErrCategory::READ>(err);
ss << "txn->get failed(), err=" << err << " label=" << label;
msg = ss.str();
return;
}
LOG(INFO) << "txn->get label_key=" << hex(label_key) << " label=" << label
<< " err=" << err;
// step 2: get txn info from label pb
TxnLabelPB label_pb;
if (err == TxnErrorCode::TXN_OK) {
if (label_val.size() <= VERSION_STAMP_LEN ||
!label_pb.ParseFromArray(label_val.data(), label_val.size() - VERSION_STAMP_LEN)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "label_pb->ParseFromString() failed, txn_id=" << txn_id << " label=" << label;
msg = ss.str();
return;
}
for (auto& cur_txn_id : label_pb.txn_ids()) {
if (cur_txn_id > txn_id) {
txn_id = cur_txn_id;
}
}
} else {
code = MetaServiceCode::TXN_ID_NOT_FOUND;
ss << "Label [" << label << "] has not found";
msg = ss.str();
return;
}
}
//not provide db_id, we need read from disk.
if (db_id < 0) {
const std::string index_key = txn_index_key({instance_id, txn_id});
std::string index_val;
err = txn->get(index_key, &index_val);
if (err != TxnErrorCode::TXN_OK) {
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND
: cast_as<ErrCategory::READ>(err);
ss << "failed to get db id with txn_id=" << txn_id << " err=" << err;
msg = ss.str();
return;
}
TxnIndexPB index_pb;
if (!index_pb.ParseFromString(index_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "failed to parse txn_inf"
<< " txn_id=" << txn_id;
msg = ss.str();
return;
}
DCHECK(index_pb.has_tablet_index() == true);
DCHECK(index_pb.tablet_index().has_db_id() == true);
db_id = index_pb.tablet_index().db_id();
if (db_id <= 0) {
ss << "internal error: unexpected db_id " << db_id;
code = MetaServiceCode::UNDEFINED_ERR;
msg = ss.str();
return;
}
}
// Get txn info with db_id and txn_id
const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
std::string info_val;
err = txn->get(info_key, &info_val);
if (err != TxnErrorCode::TXN_OK) {
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND
: cast_as<ErrCategory::READ>(err);
ss << "failed to get db id with db_id=" << db_id << " txn_id=" << txn_id << " err=" << err;
msg = ss.str();
return;
}
TxnInfoPB txn_info;
if (!txn_info.ParseFromString(info_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "failed to parse txn_info db_id=" << db_id << " txn_id=" << txn_id;
msg = ss.str();
return;
}
VLOG_DEBUG << "txn_info=" << txn_info.ShortDebugString();
DCHECK(txn_info.txn_id() == txn_id);
response->mutable_txn_info()->CopyFrom(txn_info);
}
//To get current max txn id for schema change watermark etc.
void MetaServiceImpl::get_current_max_txn_id(::google::protobuf::RpcController* controller,
const GetCurrentMaxTxnRequest* request,
GetCurrentMaxTxnResponse* response,
::google::protobuf::Closure* done) {
RPC_PREPROCESS(get_current_max_txn_id, get);
// TODO: For auth
instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty instance_id";
LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
return;
}
RPC_RATE_LIMIT(get_current_max_txn_id)
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
msg = "failed to create txn";
code = cast_as<ErrCategory::CREATE>(err);
return;
}
const std::string key = "schema change";
std::string val;
err = txn->get(key, &val);
if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
code = cast_as<ErrCategory::READ>(err);
std::stringstream ss;
ss << "txn->get() failed, err=" << err;
msg = ss.str();
return;
}
int64_t read_version = 0;
err = txn->get_read_version(&read_version);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
std::stringstream ss;
ss << "get read version failed, ret=" << err;
msg = ss.str();
return;
}
int64_t current_max_txn_id = read_version << 10;
VLOG_DEBUG << "read_version=" << read_version << " current_max_txn_id=" << current_max_txn_id;
response->set_current_max_txn_id(current_max_txn_id);
}
/**
* 1. Generate a sub_txn_id
*
* The following steps are done in a txn:
* 2. Put txn_index_key in sub_txn_id
* 3. Delete txn_label_key in sub_txn_id
* 4. Modify the txn state of the txn_id:
* - Add the sub txn id to sub_txn_ids: recycler use it to recycle the txn_index_key
* - Add the table id to table_ids
*/
void MetaServiceImpl::begin_sub_txn(::google::protobuf::RpcController* controller,
const BeginSubTxnRequest* request,
BeginSubTxnResponse* response,
::google::protobuf::Closure* done) {
RPC_PREPROCESS(begin_sub_txn, get, put, del);
int64_t txn_id = request->has_txn_id() ? request->txn_id() : -1;
int64_t sub_txn_num = request->has_sub_txn_num() ? request->sub_txn_num() : -1;
int64_t db_id = request->has_db_id() ? request->db_id() : -1;
auto& table_ids = request->table_ids();
std::string label = request->has_label() ? request->label() : "";
if (txn_id < 0 || sub_txn_num < 0 || db_id < 0 || table_ids.empty() || label.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
ss << "invalid argument, txn_id=" << txn_id << ", sub_txn_num=" << sub_txn_num
<< " db_id=" << db_id << ", label=" << label << ", table_ids=[";
for (auto table_id : table_ids) {
ss << table_id << ", ";
}
ss << "]";
msg = ss.str();
return;
}
std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
ss << "cannot find instance_id with cloud_unique_id="
<< (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id);
msg = ss.str();
return;
}
RPC_RATE_LIMIT(begin_sub_txn)
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
ss << "txn_kv_->create_txn() failed, err=" << err << " txn_id=" << txn_id
<< " db_id=" << db_id;
msg = ss.str();
return;
}
const std::string label_key = txn_label_key({instance_id, db_id, label});
std::string label_val;
err = txn->get(label_key, &label_val);
if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
code = cast_as<ErrCategory::READ>(err);
ss << "txn->get failed(), err=" << err << " label=" << label;
msg = ss.str();
return;
}
LOG(INFO) << "txn->get label_key=" << hex(label_key) << " label=" << label << " err=" << err;
// err == OK means this is a retry rpc?
if (err == TxnErrorCode::TXN_OK) {
label_val = label_val.substr(0, label_val.size() - VERSION_STAMP_LEN);
}
// ret > 0, means label not exist previously.
txn->atomic_set_ver_value(label_key, label_val);
LOG(INFO) << "txn->atomic_set_ver_value label_key=" << hex(label_key);
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::COMMIT>(err);
ss << "txn->commit failed(), label=" << label << " err=" << err;
msg = ss.str();
return;
}
stats.get_bytes += txn->get_bytes();
stats.put_bytes += txn->put_bytes();
stats.get_counter += txn->num_get_keys();
stats.put_counter += txn->num_put_keys();
// 2. Get sub txn id from version stamp
txn.reset();
err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
ss << "failed to create txn when get txn id, label=" << label << " err=" << err;
msg = ss.str();
return;
}
label_val.clear();
err = txn->get(label_key, &label_val);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
ss << "txn->get() failed, label=" << label << " err=" << err;
msg = ss.str();
return;
}
LOG(INFO) << "txn->get label_key=" << hex(label_key) << " label=" << label << " err=" << err;
// Generated by TxnKv system
int64_t sub_txn_id = 0;
int ret =
get_txn_id_from_fdb_ts(std::string_view(label_val).substr(
label_val.size() - VERSION_STAMP_LEN, label_val.size()),
&sub_txn_id);
if (ret != 0) {
code = MetaServiceCode::TXN_GEN_ID_ERR;
ss << "get_txn_id_from_fdb_ts() failed, label=" << label << " ret=" << ret;
msg = ss.str();
return;
}
LOG(INFO) << "get_txn_id_from_fdb_ts() label=" << label << " sub_txn_id=" << sub_txn_id
<< " txn_id=" << txn_id << " label_val.size()=" << label_val.size();
// write txn_index_key
const std::string index_key = txn_index_key({instance_id, sub_txn_id});
std::string index_val;
TxnIndexPB index_pb;
index_pb.mutable_tablet_index()->set_db_id(db_id);
index_pb.set_parent_txn_id(txn_id);
if (!index_pb.SerializeToString(&index_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize txn_index_pb "
<< "label=" << label << " txn_id=" << txn_id;
msg = ss.str();
return;
}
// Get and update txn info with db_id and txn_id
std::string info_val; // Will be reused when saving updated txn
const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
err = txn->get(info_key, &info_val);
if (err != TxnErrorCode::TXN_OK) {
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND
: cast_as<ErrCategory::READ>(err);
ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << txn_id << " err=" << err;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
TxnInfoPB txn_info;
if (!txn_info.ParseFromString(info_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" << txn_id;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
DCHECK(txn_info.txn_id() == txn_id);
if (txn_info.status() != TxnStatusPB::TXN_STATUS_PREPARED) {
code = MetaServiceCode::TXN_INVALID_STATUS;
ss << "transaction status is " << txn_info.status() << " : db_id=" << db_id
<< " txn_id=" << txn_id;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
if (txn_info.sub_txn_ids().size() != sub_txn_num) {
code = MetaServiceCode::UNDEFINED_ERR;
ss << "sub_txn_num mismatch, txn_id=" << txn_id << ", expected sub_txn_num=" << sub_txn_num
<< ", txn_info.sub_txn_ids=[";
for (auto id : txn_info.sub_txn_ids()) {
ss << id << ", ";
}
ss << "]";
msg = ss.str();
LOG(WARNING) << msg;
}
txn_info.mutable_sub_txn_ids()->Add(sub_txn_id);
txn_info.mutable_table_ids()->Clear();
for (auto table_id : table_ids) {
txn_info.mutable_table_ids()->Add(table_id);
}
if (!txn_info.SerializeToString(&info_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize txn_info when saving, txn_id=" << txn_id;
msg = ss.str();
return;
}
txn->remove(label_key);
txn->put(info_key, info_val);
txn->put(index_key, index_val);
LOG(INFO) << "txn_id=" << txn_id << ", sub_txn_id=" << sub_txn_id
<< ", remove label_key=" << hex(label_key) << ", put info_key=" << hex(info_key)
<< ", put index_key=" << hex(index_key);
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::COMMIT>(err);
ss << "failed to commit kv txn, txn_id=" << txn_id << " err=" << err;
msg = ss.str();
return;
}
response->set_sub_txn_id(sub_txn_id);
response->mutable_txn_info()->CopyFrom(txn_info);
}
/**
* 1. Modify the txn state of the txn_id:
* - Remove the table id from table_ids
*/
void MetaServiceImpl::abort_sub_txn(::google::protobuf::RpcController* controller,
const AbortSubTxnRequest* request,
AbortSubTxnResponse* response,
::google::protobuf::Closure* done) {
RPC_PREPROCESS(abort_sub_txn, get, put);
int64_t txn_id = request->has_txn_id() ? request->txn_id() : -1;
int64_t sub_txn_id = request->has_sub_txn_id() ? request->sub_txn_id() : -1;
int64_t sub_txn_num = request->has_sub_txn_num() ? request->sub_txn_num() : -1;
int64_t db_id = request->has_db_id() ? request->db_id() : -1;
auto& table_ids = request->table_ids();
if (txn_id < 0 || sub_txn_id < 0 || sub_txn_num < 0 || db_id < 0) {
code = MetaServiceCode::INVALID_ARGUMENT;
ss << "invalid argument, txn_id=" << txn_id << ", sub_txn_id=" << sub_txn_id
<< ", sub_txn_num=" << sub_txn_num << " db_id=" << db_id << ", table_ids=[";
for (auto table_id : table_ids) {
ss << table_id << ", ";
}
ss << "]";
msg = ss.str();
return;
}
std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
ss << "cannot find instance_id with cloud_unique_id="
<< (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id);
msg = ss.str();
return;
}
RPC_RATE_LIMIT(abort_sub_txn)
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
ss << "txn_kv_->create_txn() failed, err=" << err << " txn_id=" << txn_id
<< " sub_txn_id=" << sub_txn_id << " db_id=" << db_id;
msg = ss.str();
return;
}
// Get and update txn info with db_id and txn_id
std::string info_val; // Will be reused when saving updated txn
const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
err = txn->get(info_key, &info_val);
if (err != TxnErrorCode::TXN_OK) {
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND
: cast_as<ErrCategory::READ>(err);
ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << txn_id
<< " sub_txn_id=" << sub_txn_id << " err=" << err;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
TxnInfoPB txn_info;
if (!txn_info.ParseFromString(info_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" << txn_id
<< " sub_txn_id=" << sub_txn_id;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
DCHECK(txn_info.txn_id() == txn_id);
if (txn_info.status() != TxnStatusPB::TXN_STATUS_PREPARED) {
code = MetaServiceCode::TXN_INVALID_STATUS;
ss << "transaction status is " << txn_info.status() << " : db_id=" << db_id
<< " txn_id=" << txn_id << " sub_txn_id=" << sub_txn_id;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
// remove table_id and does not need to remove sub_txn_id
if (txn_info.sub_txn_ids().size() != sub_txn_num) {
code = MetaServiceCode::UNDEFINED_ERR;
ss << "sub_txn_num mismatch, txn_id=" << txn_id << ", sub_txn_id=" << sub_txn_id
<< ", expected sub_txn_num=" << sub_txn_num << ", txn_info.sub_txn_ids=[";
for (auto id : txn_info.sub_txn_ids()) {
ss << id << ", ";
}
ss << "]";
msg = ss.str();
LOG(WARNING) << msg;
}
txn_info.mutable_table_ids()->Clear();
for (auto table_id : table_ids) {
txn_info.mutable_table_ids()->Add(table_id);
}
// TODO should we try to delete txn_label_key if begin_sub_txn failed to delete?
if (!txn_info.SerializeToString(&info_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize txn_info when saving, txn_id=" << txn_id
<< " sub_txn_id=" << sub_txn_id;
msg = ss.str();
return;
}
txn->put(info_key, info_val);
LOG(INFO) << "txn_id=" << txn_id << ", sub_txn_id=" << sub_txn_id
<< ", put info_key=" << hex(info_key);
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::COMMIT>(err);
ss << "failed to commit kv txn, txn_id=" << txn_id << ", sub_txn_id=" << sub_txn_id
<< ", err=" << err;
msg = ss.str();
return;
}
response->mutable_txn_info()->CopyFrom(txn_info);
}
void MetaServiceImpl::abort_txn_with_coordinator(::google::protobuf::RpcController* controller,
const AbortTxnWithCoordinatorRequest* request,
AbortTxnWithCoordinatorResponse* response,
::google::protobuf::Closure* done) {
RPC_PREPROCESS(abort_txn_with_coordinator, get);
if (!request->has_id() || !request->has_ip() || !request->has_start_time()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "invalid coordinate id, coordinate ip or coordinate start time.";
return;
}
// TODO: For auth
std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
ss << "cannot find instance_id with cloud_unique_id="
<< (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id);
msg = ss.str();
return;
}
RPC_RATE_LIMIT(abort_txn_with_coordinator);
std::string begin_info_key = txn_info_key({instance_id, 0, 0});
std::string end_info_key = txn_info_key({instance_id, INT64_MAX, INT64_MAX});
LOG(INFO) << "begin_info_key:" << hex(begin_info_key) << " end_info_key:" << hex(end_info_key);
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
msg = "failed to create txn";
code = cast_as<ErrCategory::CREATE>(err);
return;
}
std::unique_ptr<RangeGetIterator> it;
int64_t abort_txn_cnt = 0;
int64_t total_iteration_cnt = 0;
bool need_commit = false;
do {
err = txn->get(begin_info_key, end_info_key, &it, true);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
ss << "failed to get txn info. err=" << err;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
while (it->has_next()) {
total_iteration_cnt++;
auto [k, v] = it->next();
VLOG_DEBUG << "check txn info txn_info_key=" << hex(k);
TxnInfoPB info_pb;
if (!info_pb.ParseFromArray(v.data(), v.size())) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "malformed txn running info";
msg = ss.str();
ss << " key=" << hex(k);
LOG(WARNING) << ss.str();
return;
}
const auto& coordinate = info_pb.coordinator();
if (info_pb.status() == TxnStatusPB::TXN_STATUS_PREPARED &&
coordinate.sourcetype() == TXN_SOURCE_TYPE_BE && coordinate.id() == request->id() &&
coordinate.ip() == request->ip() &&
coordinate.start_time() < request->start_time()) {
need_commit = true;
TxnInfoPB return_txn_info;
AbortTxnRequest request;
request.set_db_id(info_pb.db_id());
request.set_txn_id(info_pb.txn_id());
request.set_label(info_pb.label());
request.set_reason("Abort because coordinate be restart/stop");
_abort_txn(instance_id, &request, txn.get(), return_txn_info, ss, code, msg);
}
if (!it->has_next()) {
begin_info_key = k;
}
}
begin_info_key.push_back('\x00'); // Update to next smallest key for iteration
} while (it->more());
LOG(INFO) << "abort txn count: " << abort_txn_cnt
<< " total iteration count: " << total_iteration_cnt;
if (need_commit) {
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::COMMIT>(err);
ss << "failed to abort txn kv, cooridnate_id=" << request->id()
<< " coordinate_ip=" << request->ip()
<< "coordinate_start_time=" << request->start_time() << " err=" << err;
msg = ss.str();
return;
}
}
}
std::string get_txn_info_key_from_txn_running_key(std::string_view txn_running_key) {
std::string conflict_txn_info_key;
std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
txn_running_key.remove_prefix(1);
int ret = decode_key(&txn_running_key, &out);
if (ret != 0) [[unlikely]] {
// decode version key error means this is something wrong,
// we can not continue this txn
LOG(WARNING) << "failed to decode key, ret=" << ret << " key=" << hex(txn_running_key);
} else {
DCHECK(out.size() == 5) << " key=" << hex(txn_running_key) << " " << out.size();
const std::string& decode_instance_id = std::get<1>(std::get<0>(out[1]));
int64_t db_id = std::get<0>(std::get<0>(out[3]));
int64_t txn_id = std::get<0>(std::get<0>(out[4]));
conflict_txn_info_key = txn_info_key({decode_instance_id, db_id, txn_id});
}
return conflict_txn_info_key;
}
void MetaServiceImpl::check_txn_conflict(::google::protobuf::RpcController* controller,
const CheckTxnConflictRequest* request,
CheckTxnConflictResponse* response,
::google::protobuf::Closure* done) {
RPC_PREPROCESS(check_txn_conflict, get);
if (!request->has_db_id() || !request->has_end_txn_id() || (request->table_ids_size() <= 0)) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "invalid db id, end txn id or table_ids.";
return;
}
// TODO: For auth
std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
ss << "cannot find instance_id with cloud_unique_id="
<< (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id);
msg = ss.str();
return;
}
RPC_RATE_LIMIT(check_txn_conflict)
int64_t db_id = request->db_id();
std::string begin_running_key = txn_running_key({instance_id, db_id, 0});
std::string end_running_key = txn_running_key({instance_id, db_id, request->end_txn_id()});
LOG(INFO) << "begin_running_key:" << hex(begin_running_key)
<< " end_running_key:" << hex(end_running_key);
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
msg = "failed to create txn";
code = cast_as<ErrCategory::CREATE>(err);
return;
}
//TODO: use set to replace
std::vector<int64_t> src_table_ids(request->table_ids().begin(), request->table_ids().end());
std::sort(src_table_ids.begin(), src_table_ids.end());
std::unique_ptr<RangeGetIterator> it;
int64_t skip_timeout_txn_cnt = 0;
int total_iteration_cnt = 0;
bool finished = true;
do {
err = txn->get(begin_running_key, end_running_key, &it, true);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
ss << "failed to get txn running info. err=" << err;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
VLOG_DEBUG << "begin_running_key=" << hex(begin_running_key)
<< " end_running_key=" << hex(end_running_key)
<< " it->has_next()=" << it->has_next();
auto now_time = system_clock::now();
uint64_t check_time = duration_cast<milliseconds>(now_time.time_since_epoch()).count();
while (it->has_next()) {
total_iteration_cnt++;
auto [k, v] = it->next();
LOG(INFO) << "check watermark conflict range_get txn_run_key=" << hex(k);
TxnRunningPB running_pb;
if (!running_pb.ParseFromArray(v.data(), v.size())) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "malformed txn running info";
msg = ss.str();
ss << " key=" << hex(k);
LOG(WARNING) << ss.str();
return;
}
if (running_pb.timeout_time() < check_time) {
skip_timeout_txn_cnt++;
} else {
LOG(INFO) << "check watermark conflict range_get txn_run_key=" << hex(k) << " " << k
<< " running_pb=" << running_pb.ShortDebugString();
std::vector<int64_t> running_table_ids(running_pb.table_ids().begin(),
running_pb.table_ids().end());
std::sort(running_table_ids.begin(), running_table_ids.end());
std::vector<int64_t> result(
std::min(running_table_ids.size(), src_table_ids.size()));
auto iter = std::set_intersection(src_table_ids.begin(), src_table_ids.end(),
running_table_ids.begin(),
running_table_ids.end(), result.begin());
result.resize(iter - result.begin());
if (!result.empty()) {
finished = false;
std::string conflict_txn_info_key = get_txn_info_key_from_txn_running_key(k);
if (!conflict_txn_info_key.empty()) {
std::string conflict_txn_info_val;
err = txn->get(conflict_txn_info_key, &conflict_txn_info_val);
if (err != TxnErrorCode::TXN_OK) {
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND
? MetaServiceCode::TXN_ID_NOT_FOUND
: cast_as<ErrCategory::READ>(err);
ss << "failed to get txn_info, conflict_txn_info_key="
<< hex(conflict_txn_info_key);
msg = ss.str();
LOG(WARNING) << msg;
return;
}
TxnInfoPB& conflict_txn_info = *response->add_conflict_txns();
if (!conflict_txn_info.ParseFromString(conflict_txn_info_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "failed to parse txn_info, conflict_txn_info_key="
<< hex(conflict_txn_info_key);
msg = ss.str();
LOG(WARNING) << msg;
return;
}
}
}
}
if (!it->has_next()) {
begin_running_key = k;
}
}
begin_running_key.push_back('\x00'); // Update to next smallest key for iteration
} while (it->more());
LOG(INFO) << "skip timeout txn count: " << skip_timeout_txn_cnt
<< " conflict txn count: " << response->conflict_txns_size()
<< " total iteration count: " << total_iteration_cnt;
response->set_finished(finished);
}
/**
* @brief
*
* @param txn_kv
* @param instance_id
* @param db_id
* @param label_key
* @return TxnErrorCode
*/
TxnErrorCode internal_clean_label(std::shared_ptr<TxnKv> txn_kv, const std::string_view instance_id,
int64_t db_id, const std::string_view label_key, KVStats& stats,
bool is_versioned_write) {
std::string label_val;
TxnLabelPB label_pb;
int64_t key_size = 0;
int64_t val_size = 0;
std::vector<int64_t> survival_txn_ids;
std::vector<int64_t> clean_txn_ids;
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
LOG(WARNING) << "failed to create txn. err=" << err << " db_id=" << db_id
<< " label_key=" << hex(label_key);
return err;
}
DORIS_CLOUD_DEFER {
if (txn == nullptr) return;
stats.get_bytes += txn->get_bytes();
stats.put_bytes += txn->put_bytes();
stats.del_bytes += txn->delete_bytes();
stats.get_counter += txn->num_get_keys();
stats.put_counter += txn->num_put_keys();
stats.del_counter += txn->num_del_keys();
};
err = txn->get(label_key, &label_val);
if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
LOG(WARNING) << "failed to txn get err=" << err << " db_id=" << db_id
<< " label_key=" << hex(label_key);
return err;
}
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
LOG(INFO) << "txn get err=" << err << " db_id=" << db_id << " label_key=" << hex(label_key);
return TxnErrorCode::TXN_OK;
}
if (label_val.size() <= VERSION_STAMP_LEN) {
LOG(INFO) << "label_val.size()=" << label_val.size() << " db_id=" << db_id
<< " label_key=" << hex(label_key);
return TxnErrorCode::TXN_OK;
}
if (!label_pb.ParseFromArray(label_val.data(), label_val.size() - VERSION_STAMP_LEN)) {
LOG(WARNING) << "failed to parse txn label"
<< " db_id=" << db_id << " label_key=" << hex(label_key)
<< " label_val.size()=" << label_val.size();
return TxnErrorCode::TXN_UNIDENTIFIED_ERROR;
}
for (auto txn_id : label_pb.txn_ids()) {
const std::string recycle_key = recycle_txn_key({instance_id, db_id, txn_id});
const std::string index_key = txn_index_key({instance_id, txn_id});
const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
std::string info_val;
err = txn->get(info_key, &info_val);
if (err != TxnErrorCode::TXN_OK) {
LOG_WARNING("info_key get failed")
.tag("info_key", hex(info_key))
.tag("label_key", hex(label_key))
.tag("db_id", db_id)
.tag("txn_id", txn_id)
.tag("err", err);
return err;
}
TxnInfoPB txn_info;
if (!txn_info.ParseFromString(info_val)) {
LOG_WARNING("info_val parse failed")
.tag("info_key", hex(info_key))
.tag("label_key", hex(label_key))
.tag("db_id", db_id)
.tag("txn_id", txn_id)
.tag("size", info_val.size());
return TxnErrorCode::TXN_UNIDENTIFIED_ERROR;
}
std::string recycle_val;
if ((txn_info.status() != TxnStatusPB::TXN_STATUS_ABORTED) &&
(txn_info.status() != TxnStatusPB::TXN_STATUS_VISIBLE)) {
// txn status is not final status
LOG(INFO) << "txn not final state, label_key=" << hex(label_key)
<< " txn_id=" << txn_id;
survival_txn_ids.push_back(txn_id);
DCHECK_EQ(txn->get(recycle_key, &recycle_val), TxnErrorCode::TXN_KEY_NOT_FOUND);
continue;
}
// In versioned write, the recycle key will be write only when the txn operation log is recycled.
if (!is_versioned_write) {
DCHECK_EQ(txn->get(recycle_key, &recycle_val), TxnErrorCode::TXN_OK);
}
DCHECK((txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) ||
(txn_info.status() == TxnStatusPB::TXN_STATUS_VISIBLE));
txn->remove(index_key);
key_size += index_key.size();
txn->remove(info_key);
key_size += info_key.size();
txn->remove(recycle_key);
key_size += recycle_key.size();
clean_txn_ids.push_back(txn_id);
LOG(INFO) << "remove index_key=" << hex(index_key) << " info_key=" << hex(info_key)
<< " recycle_key=" << hex(recycle_key);
}
if (label_pb.txn_ids().size() == clean_txn_ids.size()) {
txn->remove(label_key);
key_size += label_key.size();
LOG(INFO) << "remove label_key=" << hex(label_key);
} else {
label_pb.clear_txn_ids();
for (auto txn_id : survival_txn_ids) {
label_pb.add_txn_ids(txn_id);
}
LOG(INFO) << "rewrite label_pb=" << label_pb.ShortDebugString();
label_val.clear();
if (!label_pb.SerializeToString(&label_val)) {
LOG(INFO) << "failed to serialize label_pb=" << label_pb.ShortDebugString()
<< " label_key=" << hex(label_key);
return TxnErrorCode::TXN_UNIDENTIFIED_ERROR;
}
txn->atomic_set_ver_value(label_key, label_val);
key_size += label_key.size();
val_size += label_val.size();
}
err = txn->commit();
TEST_SYNC_POINT_CALLBACK("internal_clean_label:err", &err);
if (err != TxnErrorCode::TXN_OK) {
LOG(INFO) << fmt::format(
"label_key={} key_size={} val_size={} label_pb={} clean_txn_ids={}", hex(label_key),
key_size, val_size, label_pb.ShortDebugString(), fmt::join(clean_txn_ids, " "));
}
return err;
}
void MetaServiceImpl::clean_txn_label(::google::protobuf::RpcController* controller,
const CleanTxnLabelRequest* request,
CleanTxnLabelResponse* response,
::google::protobuf::Closure* done) {
RPC_PREPROCESS(clean_txn_label, get, put, del);
if (!request->has_db_id()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "missing db id";
LOG(WARNING) << msg;
return;
}
std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
ss << "cannot find instance_id with cloud_unique_id="
<< (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id);
msg = ss.str();
LOG(WARNING) << msg;
return;
}
RPC_RATE_LIMIT(clean_txn_label)
const int64_t db_id = request->db_id();
bool is_versioned_write = is_version_write_enabled(instance_id);
// clean label only by db_id
if (request->labels().empty()) {
std::string begin_label_key = txn_label_key({instance_id, db_id, ""});
const std::string end_label_key = txn_label_key({instance_id, db_id + 1, ""});
std::unique_ptr<RangeGetIterator> it;
bool snapshot = true;
int limit = 1000;
TEST_SYNC_POINT_CALLBACK("clean_txn_label:limit", &limit);
do {
std::unique_ptr<Transaction> txn;
auto err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
msg = "failed to create txn";
code = cast_as<ErrCategory::CREATE>(err);
LOG(INFO) << msg << " err=" << err << " begin=" << hex(begin_label_key)
<< " end=" << hex(end_label_key);
return;
}
DORIS_CLOUD_DEFER {
if (txn == nullptr) return;
stats.get_bytes += txn->get_bytes();
stats.get_counter += txn->num_get_keys();
};
err = txn->get(begin_label_key, end_label_key, &it, snapshot, limit);
if (err != TxnErrorCode::TXN_OK) {
msg = fmt::format("failed to txn range get, err={}", err);
code = cast_as<ErrCategory::READ>(err);
LOG(WARNING) << msg << " begin=" << hex(begin_label_key)
<< " end=" << hex(end_label_key);
return;
}
if (!it->has_next()) {
LOG(INFO) << "no keys in the range. begin=" << hex(begin_label_key)
<< " end=" << hex(end_label_key);
break;
}
while (it->has_next()) {
auto [k, v] = it->next();
if (!it->has_next()) {
begin_label_key = k;
LOG(INFO) << "iterator has no more kvs. key=" << hex(k);
}
err = internal_clean_label(txn_kv_, instance_id, db_id, k, stats,
is_versioned_write);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to clean txn label. err={}", err);
LOG(WARNING) << msg << " db_id=" << db_id;
return;
}
}
begin_label_key.push_back('\x00');
} while (it->more());
} else {
const std::string& label = request->labels(0);
const std::string label_key = txn_label_key({instance_id, db_id, label});
TxnErrorCode err = internal_clean_label(txn_kv_, instance_id, db_id, label_key, stats,
is_versioned_write);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to clean txn label. err={}", err);
LOG(WARNING) << msg << " db_id=" << db_id << " label_key=" << hex(label_key);
return;
}
}
code = MetaServiceCode::OK;
}
// get txn id by label
// 1. When the requested status is not empty, return the txnid
// corresponding to the status. There may be multiple
// requested status, just match one.
// 2. When the requested status is empty, return the latest txnid.
void MetaServiceImpl::get_txn_id(::google::protobuf::RpcController* controller,
const GetTxnIdRequest* request, GetTxnIdResponse* response,
::google::protobuf::Closure* done) {
RPC_PREPROCESS(get_txn_id, get);
if (!request->has_db_id()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "missing db id";
LOG(WARNING) << msg;
return;
}
std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
ss << "cannot find instance_id with cloud_unique_id="
<< (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id);
msg = ss.str();
LOG(WARNING) << msg;
return;
}
RPC_RATE_LIMIT(get_txn_id)
const int64_t db_id = request->db_id();
std::string label = request->label();
const std::string label_key = txn_label_key({instance_id, db_id, label});
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
LOG(WARNING) << "failed to create txn. err=" << err << " db_id=" << db_id
<< " label=" << label;
code = cast_as<ErrCategory::CREATE>(err);
ss << "txn_kv_->create_txn() failed, err=" << err << " label=" << label
<< " db_id=" << db_id;
msg = ss.str();
return;
}
std::string label_val;
err = txn->get(label_key, &label_val);
if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
code = cast_as<ErrCategory::READ>(err);
ss << "txn->get failed(), err=" << err << " label=" << label;
msg = ss.str();
return;
}
if (label_val.size() <= VERSION_STAMP_LEN) {
code = MetaServiceCode::TXN_ID_NOT_FOUND;
ss << "transaction not found, label=" << label;
return;
}
TxnLabelPB label_pb;
//label_val.size() > VERSION_STAMP_LEN means label has previous txn ids.
if (!label_pb.ParseFromArray(label_val.data(), label_val.size() - VERSION_STAMP_LEN)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "label_pb->ParseFromString() failed, label=" << label;
msg = ss.str();
return;
}
if (label_pb.txn_ids_size() == 0) {
code = MetaServiceCode::TXN_ID_NOT_FOUND;
ss << "transaction not found, label=" << label;
msg = ss.str();
return;
}
// find the latest txn
if (request->txn_status_size() == 0) {
response->set_txn_id(*label_pb.txn_ids().rbegin());
return;
}
for (auto& cur_txn_id : label_pb.txn_ids()) {
const std::string cur_info_key = txn_info_key({instance_id, db_id, cur_txn_id});
std::string cur_info_val;
err = txn->get(cur_info_key, &cur_info_val);
if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
code = cast_as<ErrCategory::READ>(err);
ss << "txn->get() failed, cur_txn_id=" << cur_txn_id << " label=" << label
<< " err=" << err;
msg = ss.str();
return;
}
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
//label_to_idx and txn info inconsistency.
code = MetaServiceCode::TXN_ID_NOT_FOUND;
ss << "txn->get() failed, cur_txn_id=" << cur_txn_id << " label=" << label
<< " err=" << err;
msg = ss.str();
return;
}
TxnInfoPB cur_txn_info;
if (!cur_txn_info.ParseFromString(cur_info_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "cur_txn_info->ParseFromString() failed, cur_txn_id=" << cur_txn_id
<< " label=" << label << " err=" << err;
msg = ss.str();
return;
}
VLOG_DEBUG << "cur_txn_info=" << cur_txn_info.ShortDebugString();
for (auto txn_status : request->txn_status()) {
if (cur_txn_info.status() == txn_status) {
response->set_txn_id(cur_txn_id);
return;
}
}
}
code = MetaServiceCode::TXN_ID_NOT_FOUND;
ss << "transaction not found, label=" << label;
msg = ss.str();
return;
}
void record_txn_commit_stats(doris::cloud::Transaction* txn, const std::string& instance_id,
int64_t partition_count, int64_t tablet_count, int64_t txn_id) {
int64_t kv_count = txn->num_put_keys() + txn->num_del_keys() + txn->num_get_keys();
int64_t kv_bytes = txn->get_bytes();
LOG(INFO) << "txn commit stats, instance_id: " << instance_id << ", txn_id: " << txn_id
<< ", kv_count: " << kv_count << ", kv_bytes: " << kv_bytes
<< ", partition_count: " << partition_count << ", tablet_count: " << tablet_count;
g_bvar_ms_txn_commit_with_partition_count << partition_count;
g_bvar_ms_txn_commit_with_tablet_count << tablet_count;
g_bvar_instance_txn_commit_with_partition_count.put({instance_id}, partition_count);
g_bvar_instance_txn_commit_with_tablet_count.put({instance_id}, tablet_count);
}
} // namespace doris::cloud