| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <gen_cpp/cloud.pb.h> |
| |
| #include <chrono> |
| #include <cstdint> |
| #include <limits> |
| |
| #include "common/config.h" |
| #include "common/logging.h" |
| #include "cpp/sync_point.h" |
| #include "meta-service/doris_txn.h" |
| #include "meta-service/keys.h" |
| #include "meta-service/meta_service.h" |
| #include "meta-service/meta_service_helper.h" |
| #include "meta-service/meta_service_tablet_stats.h" |
| #include "meta-service/txn_kv.h" |
| #include "meta-service/txn_kv_error.h" |
| |
| using namespace std::chrono; |
| |
| namespace doris::cloud { |
| |
| struct TableStats { |
| int64_t updated_row_count = 0; |
| |
| TableStats() = default; |
| |
| TableStats(int64_t num_rows) : updated_row_count(num_rows) {} |
| |
| std::string to_string() const { |
| std::stringstream ss; |
| ss << "updated_row_count: " << updated_row_count; |
| return ss.str(); |
| } |
| }; |
| |
| static void get_pb_from_tablestats(TableStats& stats, TableStatsPB* stats_pb) { |
| stats_pb->set_updated_row_count(stats.updated_row_count); |
| } |
| |
| static void calc_table_stats(std::unordered_map<int64_t, TabletIndexPB>& tablet_ids, |
| std::unordered_map<int64_t, TabletStats>& tablet_stats, |
| std::map<int64_t, TableStats>& table_stats, |
| std::vector<int64_t> base_tablet_ids) { |
| int64_t table_id; |
| |
| VLOG_DEBUG << "base_tablet_ids size: " << base_tablet_ids.size(); |
| for (int64_t tablet_id : base_tablet_ids) { |
| auto it = tablet_stats.find(tablet_id); |
| if (it == tablet_stats.end()) { |
| continue; |
| } |
| const auto& tablet_stat = it->second; |
| table_id = tablet_ids[tablet_id].table_id(); |
| if (table_stats.find(table_id) == table_stats.end()) { |
| table_stats[table_id] = TableStats(tablet_stat.num_rows); |
| } else { |
| table_stats[table_id].updated_row_count += tablet_stat.num_rows; |
| } |
| } |
| } |
| |
| //TODO: we need move begin/commit etc txn to TxnManager |
| void MetaServiceImpl::begin_txn(::google::protobuf::RpcController* controller, |
| const BeginTxnRequest* request, BeginTxnResponse* response, |
| ::google::protobuf::Closure* done) { |
| RPC_PREPROCESS(begin_txn); |
| if (!request->has_txn_info()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "invalid argument, missing txn info"; |
| return; |
| } |
| |
| auto& txn_info = const_cast<TxnInfoPB&>(request->txn_info()); |
| std::string label = txn_info.has_label() ? txn_info.label() : ""; |
| int64_t db_id = txn_info.has_db_id() ? txn_info.db_id() : -1; |
| |
| if (label.empty() || db_id < 0 || txn_info.table_ids().empty() || !txn_info.has_timeout_ms()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| ss << "invalid argument, label=" << label << " db_id=" << db_id; |
| msg = ss.str(); |
| return; |
| } |
| |
| std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : ""; |
| instance_id = get_instance_id(resource_mgr_, cloud_unique_id); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| ss << "cannot find instance_id with cloud_unique_id=" |
| << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id) << " label=" << label; |
| msg = ss.str(); |
| return; |
| } |
| |
| RPC_RATE_LIMIT(begin_txn) |
| //1. Generate version stamp for txn id |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| ss << "txn_kv_->create_txn() failed, err=" << err << " label=" << label |
| << " db_id=" << db_id; |
| msg = ss.str(); |
| return; |
| } |
| |
| const std::string label_key = txn_label_key({instance_id, db_id, label}); |
| std::string label_val; |
| err = txn->get(label_key, &label_val); |
| if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| code = cast_as<ErrCategory::READ>(err); |
| ss << "txn->get failed(), err=" << err << " label=" << label; |
| msg = ss.str(); |
| return; |
| } |
| |
| LOG(INFO) << "txn->get label_key=" << hex(label_key) << " label=" << label << " err=" << err; |
| |
| // err == OK means label has previous txn ids. |
| if (err == TxnErrorCode::TXN_OK) { |
| label_val = label_val.substr(0, label_val.size() - VERSION_STAMP_LEN); |
| } |
| |
| //ret > 0, means label not exist previously. |
| txn->atomic_set_ver_value(label_key, label_val); |
| LOG(INFO) << "txn->atomic_set_ver_value label_key=" << hex(label_key); |
| |
| TEST_SYNC_POINT_CALLBACK("begin_txn:before:commit_txn:1", &label); |
| err = txn->commit(); |
| TEST_SYNC_POINT_CALLBACK("begin_txn:after:commit_txn:1", &label); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::COMMIT>(err); |
| ss << "txn->commit failed(), label=" << label << " err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| //2. Get txn id from version stamp |
| txn.reset(); |
| |
| err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| ss << "failed to create txn when get txn id, label=" << label << " err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| |
| label_val.clear(); |
| err = txn->get(label_key, &label_val); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| ss << "txn->get() failed, label=" << label << " err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| |
| LOG(INFO) << "txn->get label_key=" << hex(label_key) << " label=" << label << " err=" << err; |
| |
| // Generated by TxnKv system |
| int64_t txn_id = 0; |
| int ret = |
| get_txn_id_from_fdb_ts(std::string_view(label_val).substr( |
| label_val.size() - VERSION_STAMP_LEN, label_val.size()), |
| &txn_id); |
| if (ret != 0) { |
| code = MetaServiceCode::TXN_GEN_ID_ERR; |
| ss << "get_txn_id_from_fdb_ts() failed, label=" << label << " ret=" << ret; |
| msg = ss.str(); |
| return; |
| } |
| |
| LOG(INFO) << "get_txn_id_from_fdb_ts() label=" << label << " txn_id=" << txn_id |
| << " label_val.size()=" << label_val.size(); |
| |
| TxnLabelPB label_pb; |
| if (label_val.size() > VERSION_STAMP_LEN) { |
| //3. Check label |
| //label_val.size() > VERSION_STAMP_LEN means label has previous txn ids. |
| if (!label_pb.ParseFromArray(label_val.data(), label_val.size() - VERSION_STAMP_LEN)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| ss << "label_pb->ParseFromString() failed, txn_id=" << txn_id << " label=" << label; |
| msg = ss.str(); |
| return; |
| } |
| |
| // Check if label already used, by following steps |
| // 1. get all existing transactions |
| // 2. if there is a PREPARE transaction, check if this is a retry request. |
| // 3. if there is a non-aborted transaction, throw label already used exception. |
| |
| for (auto it = label_pb.txn_ids().rbegin(); it != label_pb.txn_ids().rend(); ++it) { |
| int64_t cur_txn_id = *it; |
| const std::string cur_info_key = txn_info_key({instance_id, db_id, cur_txn_id}); |
| std::string cur_info_val; |
| err = txn->get(cur_info_key, &cur_info_val); |
| if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| code = cast_as<ErrCategory::READ>(err); |
| ss << "txn->get() failed, cur_txn_id=" << cur_txn_id << " label=" << label |
| << " err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| |
| if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| //label_to_idx and txn info inconsistency. |
| code = MetaServiceCode::TXN_ID_NOT_FOUND; |
| ss << "txn->get() failed, cur_txn_id=" << cur_txn_id << " label=" << label |
| << " err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| |
| TxnInfoPB cur_txn_info; |
| if (!cur_txn_info.ParseFromString(cur_info_val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| ss << "cur_txn_info->ParseFromString() failed, cur_txn_id=" << cur_txn_id |
| << " label=" << label << " err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| |
| VLOG_DEBUG << "cur_txn_info=" << cur_txn_info.ShortDebugString(); |
| LOG(INFO) << " size=" << label_pb.txn_ids().size() |
| << " status=" << cur_txn_info.status() << " txn_id=" << txn_id |
| << " label=" << label; |
| if (cur_txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) { |
| if (label_pb.txn_ids().size() >= config::max_num_aborted_txn) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| ss << "too many aborted txn for label=" << label << " txn_id=" << txn_id |
| << ", please check your data quality"; |
| msg = ss.str(); |
| LOG(WARNING) << msg << " label_pb=" << label_pb.ShortDebugString(); |
| return; |
| } |
| break; |
| } |
| |
| if (cur_txn_info.status() == TxnStatusPB::TXN_STATUS_PREPARED || |
| cur_txn_info.status() == TxnStatusPB::TXN_STATUS_PRECOMMITTED) { |
| // clang-format off |
| if (cur_txn_info.has_request_id() && txn_info.has_request_id() && |
| ((cur_txn_info.request_id().hi() == txn_info.request_id().hi()) && |
| (cur_txn_info.request_id().lo() == txn_info.request_id().lo()))) { |
| |
| response->set_dup_txn_id(cur_txn_info.txn_id()); |
| code = MetaServiceCode::TXN_DUPLICATED_REQ; |
| ss << "db_id=" << db_id << " label=" << label << " txn_id=" << cur_txn_info.txn_id() << " dup begin txn request."; |
| msg = ss.str(); |
| return; |
| } |
| // clang-format on |
| } |
| response->set_txn_status(cur_txn_info.status()); |
| code = MetaServiceCode::TXN_LABEL_ALREADY_USED; |
| ss << "Label [" << label << "] has already been used, relate to txn [" |
| << cur_txn_info.txn_id() << "], status=[" << TxnStatusPB_Name(cur_txn_info.status()) |
| << "]"; |
| msg = ss.str(); |
| return; |
| } |
| } |
| |
| // Update txn_info to be put into TxnKv |
| // Update txn_id in PB |
| txn_info.set_txn_id(txn_id); |
| // TODO: |
| // check initial status must be TXN_STATUS_PREPARED or TXN_STATUS_UNKNOWN |
| txn_info.set_status(TxnStatusPB::TXN_STATUS_PREPARED); |
| |
| auto now_time = system_clock::now(); |
| uint64_t prepare_time = duration_cast<milliseconds>(now_time.time_since_epoch()).count(); |
| |
| txn_info.set_prepare_time(prepare_time); |
| //4. put txn info and db_tbl |
| const std::string info_key = txn_info_key({instance_id, db_id, txn_id}); |
| std::string info_val; |
| if (!txn_info.SerializeToString(&info_val)) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| ss << "failed to serialize txn_info, label=" << label << " txn_id=" << txn_id; |
| msg = ss.str(); |
| return; |
| } |
| |
| const std::string index_key = txn_index_key({instance_id, txn_id}); |
| std::string index_val; |
| TxnIndexPB index_pb; |
| index_pb.mutable_tablet_index()->set_db_id(db_id); |
| if (!index_pb.SerializeToString(&index_val)) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| ss << "failed to serialize txn_index_pb " |
| << "label=" << label << " txn_id=" << txn_id; |
| msg = ss.str(); |
| return; |
| } |
| |
| const std::string running_key = txn_running_key({instance_id, db_id, txn_id}); |
| std::string running_val; |
| TxnRunningPB running_pb; |
| running_pb.set_timeout_time(prepare_time + txn_info.timeout_ms()); |
| running_pb.mutable_table_ids()->CopyFrom(txn_info.table_ids()); |
| VLOG_DEBUG << "label=" << label << " txn_id=" << txn_id |
| << "running_pb=" << running_pb.ShortDebugString(); |
| if (!running_pb.SerializeToString(&running_val)) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| ss << "failed to serialize running_pb label=" << label << " txn_id=" << txn_id; |
| msg = ss.str(); |
| return; |
| } |
| |
| label_pb.add_txn_ids(txn_id); |
| VLOG_DEBUG << "label=" << label << " txn_id=" << txn_id |
| << "txn_label_pb=" << label_pb.ShortDebugString(); |
| if (!label_pb.SerializeToString(&label_val)) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| ss << "failed to serialize txn_label_pb label=" << label << " txn_id=" << txn_id; |
| msg = ss.str(); |
| return; |
| } |
| txn->atomic_set_ver_value(label_key, label_val); |
| LOG(INFO) << "txn->atomic_set_ver_value label_key=" << hex(label_key) << " label=" << label |
| << " txn_id=" << txn_id; |
| |
| txn->put(info_key, info_val); |
| txn->put(index_key, index_val); |
| txn->put(running_key, running_val); |
| LOG(INFO) << "xxx put info_key=" << hex(info_key) << " txn_id=" << txn_id; |
| LOG(INFO) << "xxx put running_key=" << hex(running_key) << " txn_id=" << txn_id; |
| LOG(INFO) << "xxx put index_key=" << hex(index_key) << " txn_id=" << txn_id; |
| |
| err = txn->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::COMMIT>(err); |
| ss << "failed to commit txn kv, label=" << label << " txn_id=" << txn_id << " err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| TEST_SYNC_POINT_CALLBACK("begin_txn:after:commit_txn:2", &txn_id); |
| response->set_txn_id(txn_id); |
| } |
| |
| void MetaServiceImpl::precommit_txn(::google::protobuf::RpcController* controller, |
| const PrecommitTxnRequest* request, |
| PrecommitTxnResponse* response, |
| ::google::protobuf::Closure* done) { |
| RPC_PREPROCESS(precommit_txn); |
| int64_t txn_id = request->has_txn_id() ? request->txn_id() : -1; |
| int64_t db_id = request->has_db_id() ? request->db_id() : -1; |
| if ((txn_id < 0 && db_id < 0)) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| ss << "invalid argument, " |
| << "txn_id=" << txn_id << " db_id=" << db_id; |
| msg = ss.str(); |
| return; |
| } |
| |
| std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : ""; |
| instance_id = get_instance_id(resource_mgr_, cloud_unique_id); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| ss << "cannot find instance_id with cloud_unique_id=" |
| << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id) << " txn_id=" << txn_id; |
| msg = ss.str(); |
| return; |
| } |
| RPC_RATE_LIMIT(precommit_txn); |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| ss << "txn_kv_->create_txn() failed, err=" << err << " txn_id=" << txn_id; |
| msg = ss.str(); |
| return; |
| } |
| |
| //not provide db_id, we need read from disk. |
| if (db_id < 0) { |
| const std::string index_key = txn_index_key({instance_id, txn_id}); |
| std::string index_val; |
| err = txn->get(index_key, &index_val); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND |
| : cast_as<ErrCategory::READ>(err); |
| ss << "failed to get db id with txn_id=" << txn_id << " err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| TxnIndexPB index_pb; |
| if (!index_pb.ParseFromString(index_val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| ss << "failed to parse txn_info" |
| << " txn_id=" << txn_id; |
| msg = ss.str(); |
| return; |
| } |
| DCHECK(index_pb.has_tablet_index() == true); |
| DCHECK(index_pb.tablet_index().has_db_id() == true); |
| db_id = index_pb.tablet_index().db_id(); |
| VLOG_DEBUG << " find db_id=" << db_id << " from index"; |
| } else { |
| db_id = request->db_id(); |
| } |
| |
| // Get txn info with db_id and txn_id |
| const std::string info_key = txn_info_key({instance_id, db_id, txn_id}); |
| std::string info_val; // Will be reused when saving updated txn |
| err = txn->get(info_key, &info_val); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND |
| : cast_as<ErrCategory::READ>(err); |
| ss << "failed to get db id with db_id=" << db_id << " txn_id=" << txn_id << " err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| |
| TxnInfoPB txn_info; |
| if (!txn_info.ParseFromString(info_val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| ss << "failed to parse txn_inf db_id=" << db_id << " txn_id=" << txn_id; |
| msg = ss.str(); |
| return; |
| } |
| |
| DCHECK(txn_info.txn_id() == txn_id); |
| if (txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) { |
| code = MetaServiceCode::TXN_ALREADY_ABORTED; |
| ss << "transaction is already aborted: db_id=" << db_id << " txn_id=" << txn_id; |
| msg = ss.str(); |
| return; |
| } |
| |
| if (txn_info.status() == TxnStatusPB::TXN_STATUS_VISIBLE) { |
| code = MetaServiceCode::TXN_ALREADY_VISIBLE; |
| ss << "transaction is already visible: db_id=" << db_id << " txn_id=" << txn_id; |
| msg = ss.str(); |
| } |
| |
| if (txn_info.status() == TxnStatusPB::TXN_STATUS_PRECOMMITTED) { |
| code = MetaServiceCode::TXN_ALREADY_PRECOMMITED; |
| ss << "transaction is already precommited: db_id=" << db_id << " txn_id=" << txn_id; |
| msg = ss.str(); |
| } |
| |
| LOG(INFO) << "before update txn_info=" << txn_info.ShortDebugString(); |
| |
| // Update txn_info |
| txn_info.set_status(TxnStatusPB::TXN_STATUS_PRECOMMITTED); |
| |
| auto now_time = system_clock::now(); |
| uint64_t precommit_time = duration_cast<milliseconds>(now_time.time_since_epoch()).count(); |
| txn_info.set_precommit_time(precommit_time); |
| if (request->has_commit_attachment()) { |
| txn_info.mutable_commit_attachment()->CopyFrom(request->commit_attachment()); |
| } |
| LOG(INFO) << "after update txn_info=" << txn_info.ShortDebugString(); |
| |
| info_val.clear(); |
| if (!txn_info.SerializeToString(&info_val)) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| ss << "failed to serialize txn_info when saving, txn_id=" << txn_id; |
| msg = ss.str(); |
| return; |
| } |
| |
| txn->put(info_key, info_val); |
| LOG(INFO) << "xxx put info_key=" << hex(info_key) << " txn_id=" << txn_id; |
| |
| const std::string running_key = txn_running_key({instance_id, db_id, txn_id}); |
| std::string running_val; |
| |
| TxnRunningPB running_pb; |
| running_pb.set_timeout_time(precommit_time + txn_info.precommit_timeout_ms()); |
| running_pb.mutable_table_ids()->CopyFrom(txn_info.table_ids()); |
| if (!running_pb.SerializeToString(&running_val)) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| ss << "failed to serialize running_pb, txn_id=" << txn_id; |
| msg = ss.str(); |
| return; |
| } |
| |
| txn->put(running_key, running_val); |
| LOG(INFO) << "xxx put running_key=" << hex(running_key) << " txn_id=" << txn_id; |
| |
| err = txn->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::COMMIT>(err); |
| ss << "failed to commit txn kv, txn_id=" << txn_id << " err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| } |
| |
| void put_routine_load_progress(MetaServiceCode& code, std::string& msg, |
| const std::string& instance_id, const CommitTxnRequest* request, |
| Transaction* txn, int64_t db_id) { |
| std::stringstream ss; |
| int64_t txn_id = request->txn_id(); |
| if (!request->has_commit_attachment()) { |
| ss << "failed to get commit attachment from req, db_id=" << db_id << " txn_id=" << txn_id; |
| msg = ss.str(); |
| return; |
| } |
| |
| TxnCommitAttachmentPB txn_commit_attachment = request->commit_attachment(); |
| RLTaskTxnCommitAttachmentPB commit_attachment = |
| txn_commit_attachment.rl_task_txn_commit_attachment(); |
| int64_t job_id = commit_attachment.job_id(); |
| |
| std::string rl_progress_key; |
| std::string rl_progress_val; |
| bool prev_progress_existed = true; |
| RLJobProgressKeyInfo rl_progress_key_info {instance_id, db_id, job_id}; |
| rl_job_progress_key_info(rl_progress_key_info, &rl_progress_key); |
| TxnErrorCode err = txn->get(rl_progress_key, &rl_progress_val); |
| if (err != TxnErrorCode::TXN_OK) { |
| if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| prev_progress_existed = false; |
| } else { |
| code = cast_as<ErrCategory::READ>(err); |
| ss << "failed to get routine load progress, db_id=" << db_id << " txn_id=" << txn_id |
| << " err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| } |
| |
| RoutineLoadProgressPB prev_progress_info; |
| if (prev_progress_existed) { |
| if (!prev_progress_info.ParseFromString(rl_progress_val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| ss << "failed to parse routine load progress, db_id=" << db_id << " txn_id=" << txn_id; |
| msg = ss.str(); |
| return; |
| } |
| } |
| |
| std::string new_progress_val; |
| RoutineLoadProgressPB new_progress_info; |
| new_progress_info.CopyFrom(commit_attachment.progress()); |
| for (auto const& elem : prev_progress_info.partition_to_offset()) { |
| auto it = new_progress_info.partition_to_offset().find(elem.first); |
| if (it == new_progress_info.partition_to_offset().end()) { |
| new_progress_info.mutable_partition_to_offset()->insert(elem); |
| } |
| } |
| |
| std::string new_statistic_val; |
| RoutineLoadJobStatisticPB* new_statistic_info = new_progress_info.mutable_stat(); |
| if (prev_progress_info.has_stat()) { |
| const RoutineLoadJobStatisticPB& prev_statistic_info = prev_progress_info.stat(); |
| |
| new_statistic_info->set_filtered_rows(prev_statistic_info.filtered_rows() + |
| commit_attachment.filtered_rows()); |
| new_statistic_info->set_loaded_rows(prev_statistic_info.loaded_rows() + |
| commit_attachment.loaded_rows()); |
| new_statistic_info->set_unselected_rows(prev_statistic_info.unselected_rows() + |
| commit_attachment.unselected_rows()); |
| new_statistic_info->set_received_bytes(prev_statistic_info.received_bytes() + |
| commit_attachment.received_bytes()); |
| new_statistic_info->set_task_execution_time_ms( |
| prev_statistic_info.task_execution_time_ms() + |
| commit_attachment.task_execution_time_ms()); |
| } else { |
| new_statistic_info->set_filtered_rows(commit_attachment.filtered_rows()); |
| new_statistic_info->set_loaded_rows(commit_attachment.loaded_rows()); |
| new_statistic_info->set_unselected_rows(commit_attachment.unselected_rows()); |
| new_statistic_info->set_received_bytes(commit_attachment.received_bytes()); |
| new_statistic_info->set_task_execution_time_ms(commit_attachment.task_execution_time_ms()); |
| } |
| |
| if (!new_progress_info.SerializeToString(&new_progress_val)) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| ss << "failed to serialize new progress val, txn_id=" << txn_id; |
| msg = ss.str(); |
| return; |
| } |
| |
| txn->put(rl_progress_key, new_progress_val); |
| LOG(INFO) << "put rl_progress_key key=" << hex(rl_progress_key) |
| << " routine load new progress: " << new_progress_info.ShortDebugString(); |
| } |
| |
| void MetaServiceImpl::get_rl_task_commit_attach(::google::protobuf::RpcController* controller, |
| const GetRLTaskCommitAttachRequest* request, |
| GetRLTaskCommitAttachResponse* response, |
| ::google::protobuf::Closure* done) { |
| RPC_PREPROCESS(get_rl_task_commit_attach); |
| instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id()); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "empty instance_id"; |
| LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id(); |
| return; |
| } |
| RPC_RATE_LIMIT(get_rl_task_commit_attach) |
| |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| ss << "filed to create txn, err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| |
| if (!request->has_db_id() || !request->has_job_id()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "empty db_id or job_id"; |
| LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id(); |
| return; |
| } |
| |
| int64_t db_id = request->db_id(); |
| int64_t job_id = request->job_id(); |
| std::string rl_progress_key; |
| std::string rl_progress_val; |
| RLJobProgressKeyInfo rl_progress_key_info {instance_id, db_id, job_id}; |
| rl_job_progress_key_info(rl_progress_key_info, &rl_progress_key); |
| err = txn->get(rl_progress_key, &rl_progress_val); |
| if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| code = MetaServiceCode::ROUTINE_LOAD_PROGRESS_NOT_FOUND; |
| ss << "progress info not found, db_id=" << db_id << " job_id=" << job_id << " err=" << err; |
| msg = ss.str(); |
| return; |
| } else if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| ss << "failed to get progress info, db_id=" << db_id << " job_id=" << job_id |
| << " err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| |
| RLTaskTxnCommitAttachmentPB* commit_attach = response->mutable_commit_attach(); |
| RoutineLoadProgressPB* progress_info = commit_attach->mutable_progress(); |
| if (!progress_info->ParseFromString(rl_progress_val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| ss << "failed to parse progress info, db_id=" << db_id << " job_id=" << job_id; |
| msg = ss.str(); |
| return; |
| } |
| |
| if (progress_info->has_stat()) { |
| const RoutineLoadJobStatisticPB& statistic_info = progress_info->stat(); |
| commit_attach->set_filtered_rows(statistic_info.filtered_rows()); |
| commit_attach->set_loaded_rows(statistic_info.loaded_rows()); |
| commit_attach->set_unselected_rows(statistic_info.unselected_rows()); |
| commit_attach->set_received_bytes(statistic_info.received_bytes()); |
| commit_attach->set_task_execution_time_ms(statistic_info.task_execution_time_ms()); |
| } |
| } |
| |
| void MetaServiceImpl::reset_rl_progress(::google::protobuf::RpcController* controller, |
| const ResetRLProgressRequest* request, |
| ResetRLProgressResponse* response, |
| ::google::protobuf::Closure* done) { |
| RPC_PREPROCESS(reset_rl_progress); |
| instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id()); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "empty instance_id"; |
| LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id(); |
| return; |
| } |
| RPC_RATE_LIMIT(reset_rl_progress) |
| |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| ss << "filed to create txn, err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| |
| if (!request->has_db_id() || !request->has_job_id()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "empty db_id or job_id"; |
| LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id(); |
| return; |
| } |
| |
| int64_t db_id = request->db_id(); |
| int64_t job_id = request->job_id(); |
| std::string rl_progress_key; |
| std::string rl_progress_val; |
| RLJobProgressKeyInfo rl_progress_key_info {instance_id, db_id, job_id}; |
| rl_job_progress_key_info(rl_progress_key_info, &rl_progress_key); |
| |
| if (request->partition_to_offset().size() == 0) { |
| txn->remove(rl_progress_key); |
| LOG(INFO) << "remove rl_progress_key key=" << hex(rl_progress_key); |
| } |
| |
| if (request->partition_to_offset().size() > 0) { |
| bool prev_progress_existed = true; |
| RoutineLoadProgressPB prev_progress_info; |
| TxnErrorCode err = txn->get(rl_progress_key, &rl_progress_val); |
| if (err != TxnErrorCode::TXN_OK) { |
| if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| prev_progress_existed = false; |
| } else { |
| code = cast_as<ErrCategory::READ>(err); |
| ss << "failed to get routine load progress, db_id=" << db_id << "job_id=" << job_id |
| << " err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| } |
| if (prev_progress_existed) { |
| if (!prev_progress_info.ParseFromString(rl_progress_val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| ss << "failed to parse routine load progress, db_id=" << db_id |
| << "job_id=" << job_id; |
| msg = ss.str(); |
| return; |
| } |
| } |
| |
| std::string new_progress_val; |
| RoutineLoadProgressPB new_progress_info; |
| for (auto const& elem : request->partition_to_offset()) { |
| new_progress_info.mutable_partition_to_offset()->insert(elem); |
| } |
| if (request->partition_to_offset().size() > 0) { |
| for (auto const& elem : prev_progress_info.partition_to_offset()) { |
| auto it = new_progress_info.partition_to_offset().find(elem.first); |
| if (it == new_progress_info.partition_to_offset().end()) { |
| new_progress_info.mutable_partition_to_offset()->insert(elem); |
| } |
| } |
| } |
| |
| if (!new_progress_info.SerializeToString(&new_progress_val)) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| ss << "failed to serialize new progress val" |
| << "db_id=" << db_id << "job_id=" << job_id; |
| msg = ss.str(); |
| return; |
| } |
| txn->put(rl_progress_key, new_progress_val); |
| LOG(INFO) << "put rl_progress_key key=" << hex(rl_progress_key); |
| } |
| |
| err = txn->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| ss << "failed to commit progress info, db_id=" << db_id << " job_id=" << job_id |
| << " err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| } |
| |
| void scan_tmp_rowset( |
| const std::string& instance_id, int64_t txn_id, std::shared_ptr<TxnKv> txn_kv, |
| MetaServiceCode& code, std::string& msg, int64_t* db_id, |
| std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>* tmp_rowsets_meta) { |
| // Create a readonly txn for scan tmp rowset |
| std::stringstream ss; |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| ss << "failed to create txn, txn_id=" << txn_id << " err=" << err; |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| |
| // Get db id with txn id |
| std::string index_val; |
| const std::string index_key = txn_index_key({instance_id, txn_id}); |
| err = txn->get(index_key, &index_val); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| ss << "failed to get db id, txn_id=" << txn_id << " err=" << err; |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| |
| TxnIndexPB index_pb; |
| if (!index_pb.ParseFromString(index_val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| ss << "failed to parse txn_index_pb, txn_id=" << txn_id; |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| |
| DCHECK(index_pb.has_tablet_index() == true); |
| DCHECK(index_pb.tablet_index().has_db_id() == true); |
| *db_id = index_pb.tablet_index().db_id(); |
| |
| // Get temporary rowsets involved in the txn |
| // This is a range scan |
| MetaRowsetTmpKeyInfo rs_tmp_key_info0 {instance_id, txn_id, 0}; |
| MetaRowsetTmpKeyInfo rs_tmp_key_info1 {instance_id, txn_id + 1, 0}; |
| std::string rs_tmp_key0; |
| std::string rs_tmp_key1; |
| meta_rowset_tmp_key(rs_tmp_key_info0, &rs_tmp_key0); |
| meta_rowset_tmp_key(rs_tmp_key_info1, &rs_tmp_key1); |
| |
| int num_rowsets = 0; |
| std::unique_ptr<int, std::function<void(int*)>> defer_log_range( |
| (int*)0x01, [rs_tmp_key0, rs_tmp_key1, &num_rowsets, &txn_id](int*) { |
| LOG(INFO) << "get tmp rowset meta, txn_id=" << txn_id |
| << " num_rowsets=" << num_rowsets << " range=[" << hex(rs_tmp_key0) << "," |
| << hex(rs_tmp_key1) << ")"; |
| }); |
| |
| std::unique_ptr<RangeGetIterator> it; |
| do { |
| err = txn->get(rs_tmp_key0, rs_tmp_key1, &it, true); |
| if (err == TxnErrorCode::TXN_TOO_OLD) { |
| err = txn_kv->create_txn(&txn); |
| if (err == TxnErrorCode::TXN_OK) { |
| err = txn->get(rs_tmp_key0, rs_tmp_key1, &it, true); |
| } |
| } |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| ss << "internal error, failed to get tmp rowset while committing, txn_id=" << txn_id |
| << " err=" << err; |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| |
| while (it->has_next()) { |
| auto [k, v] = it->next(); |
| LOG(INFO) << "range_get rowset_tmp_key=" << hex(k) << " txn_id=" << txn_id; |
| tmp_rowsets_meta->emplace_back(); |
| if (!tmp_rowsets_meta->back().second.ParseFromArray(v.data(), v.size())) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| ss << "malformed rowset meta, unable to initialize, txn_id=" << txn_id |
| << " key=" << hex(k); |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| // Save keys that will be removed later |
| tmp_rowsets_meta->back().first = std::string(k.data(), k.size()); |
| ++num_rowsets; |
| if (!it->has_next()) rs_tmp_key0 = k; |
| } |
| rs_tmp_key0.push_back('\x00'); // Update to next smallest key for iteration |
| } while (it->more()); |
| |
| VLOG_DEBUG << "txn_id=" << txn_id << " tmp_rowsets_meta.size()=" << tmp_rowsets_meta->size(); |
| return; |
| } |
| |
| void update_tablet_stats(const StatsTabletKeyInfo& info, const TabletStats& stats, |
| std::unique_ptr<Transaction>& txn, MetaServiceCode& code, |
| std::string& msg) { |
| if (config::split_tablet_stats) { |
| if (stats.num_segs > 0) { |
| std::string data_size_key; |
| stats_tablet_data_size_key(info, &data_size_key); |
| txn->atomic_add(data_size_key, stats.data_size); |
| std::string num_rows_key; |
| stats_tablet_num_rows_key(info, &num_rows_key); |
| txn->atomic_add(num_rows_key, stats.num_rows); |
| std::string num_segs_key; |
| stats_tablet_num_segs_key(info, &num_segs_key); |
| txn->atomic_add(num_segs_key, stats.num_segs); |
| std::string index_size_key; |
| stats_tablet_index_size_key(info, &index_size_key); |
| txn->atomic_add(index_size_key, stats.index_size); |
| std::string segment_size_key; |
| stats_tablet_segment_size_key(info, &segment_size_key); |
| txn->atomic_add(segment_size_key, stats.segment_size); |
| } |
| std::string num_rowsets_key; |
| stats_tablet_num_rowsets_key(info, &num_rowsets_key); |
| txn->atomic_add(num_rowsets_key, stats.num_rowsets); |
| } else { |
| std::string key; |
| stats_tablet_key(info, &key); |
| std::string val; |
| TxnErrorCode err = txn->get(key, &val); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TABLET_NOT_FOUND |
| : cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to get tablet stats, err={} tablet_id={}", err, |
| std::get<4>(info)); |
| return; |
| } |
| TabletStatsPB stats_pb; |
| if (!stats_pb.ParseFromString(val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = fmt::format("malformed tablet stats value, key={}", hex(key)); |
| return; |
| } |
| stats_pb.set_data_size(stats_pb.data_size() + stats.data_size); |
| stats_pb.set_num_rows(stats_pb.num_rows() + stats.num_rows); |
| stats_pb.set_num_rowsets(stats_pb.num_rowsets() + stats.num_rowsets); |
| stats_pb.set_num_segments(stats_pb.num_segments() + stats.num_segs); |
| stats_pb.set_index_size(stats_pb.index_size() + stats.index_size); |
| stats_pb.set_segment_size(stats_pb.segment_size() + stats.segment_size); |
| stats_pb.SerializeToString(&val); |
| txn->put(key, val); |
| LOG(INFO) << "put stats_tablet_key key=" << hex(key); |
| } |
| } |
| |
| // process mow table, check lock and remove pending key |
| void process_mow_when_commit_txn( |
| const CommitTxnRequest* request, const std::string& instance_id, MetaServiceCode& code, |
| std::string& msg, std::unique_ptr<Transaction>& txn, |
| std::unordered_map<int64_t, std::vector<int64_t>>& table_id_tablet_ids) { |
| int64_t txn_id = request->txn_id(); |
| std::stringstream ss; |
| std::vector<std::string> lock_keys; |
| lock_keys.reserve(request->mow_table_ids().size()); |
| for (auto table_id : request->mow_table_ids()) { |
| lock_keys.push_back(meta_delete_bitmap_update_lock_key({instance_id, table_id, -1})); |
| } |
| std::vector<std::optional<std::string>> lock_values; |
| TxnErrorCode err = txn->batch_get(&lock_values, lock_keys); |
| if (err != TxnErrorCode::TXN_OK) { |
| ss << "failed to get delete bitmap update lock key info, instance_id=" << instance_id |
| << " err=" << err; |
| msg = ss.str(); |
| code = cast_as<ErrCategory::READ>(err); |
| LOG(WARNING) << msg << " txn_id=" << txn_id; |
| return; |
| } |
| size_t total_locks = lock_keys.size(); |
| for (size_t i = 0; i < total_locks; i++) { |
| int64_t table_id = request->mow_table_ids(i); |
| // When the key does not exist, it means the lock has been acquired |
| // by another transaction and successfully committed. |
| if (!lock_values[i].has_value()) { |
| ss << "get delete bitmap update lock info, lock is expired" |
| << " table_id=" << table_id << " key=" << hex(lock_keys[i]) << " txn_id=" << txn_id; |
| code = MetaServiceCode::LOCK_EXPIRED; |
| msg = ss.str(); |
| LOG(WARNING) << msg << " txn_id=" << txn_id; |
| return; |
| } |
| |
| DeleteBitmapUpdateLockPB lock_info; |
| if (!lock_info.ParseFromString(lock_values[i].value())) [[unlikely]] { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = "failed to parse DeleteBitmapUpdateLockPB"; |
| LOG(WARNING) << msg << " txn_id=" << txn_id; |
| return; |
| } |
| if (lock_info.lock_id() != request->txn_id()) { |
| ss << "lock is expired, locked by lock_id=" << lock_info.lock_id(); |
| msg = ss.str(); |
| code = MetaServiceCode::LOCK_EXPIRED; |
| return; |
| } |
| txn->remove(lock_keys[i]); |
| LOG(INFO) << "xxx remove delete bitmap lock, lock_key=" << hex(lock_keys[i]) |
| << " table_id=" << table_id << " txn_id=" << txn_id; |
| |
| for (auto tablet_id : table_id_tablet_ids[table_id]) { |
| std::string pending_key = meta_pending_delete_bitmap_key({instance_id, tablet_id}); |
| txn->remove(pending_key); |
| LOG(INFO) << "xxx remove delete bitmap pending key, pending_key=" << hex(pending_key) |
| << " txn_id=" << txn_id; |
| } |
| } |
| lock_keys.clear(); |
| lock_values.clear(); |
| } |
| |
| /** |
| * 0. Extract txn_id from request |
| * 1. Get db id from TxnKv with txn_id |
| * 2. Get TxnInfo from TxnKv with db_id and txn_id |
| * 3. Get tmp rowset meta, there may be several or hundred of tmp rowsets |
| * 4. Get versions of each rowset |
| * 5. Put rowset meta, which will be visible to user |
| * 6. Put TxnInfo back into TxnKv with updated txn status (committed) |
| * 7. Update versions of each partition |
| * 8. Remove tmp rowset meta |
| * |
| * Note: getting version and all changes maded are in a single TxnKv transaction: |
| * step 5, 6, 7, 8 |
| */ |
| void commit_txn_immediately( |
| const CommitTxnRequest* request, CommitTxnResponse* response, |
| std::shared_ptr<TxnKv>& txn_kv, std::shared_ptr<TxnLazyCommitter>& txn_lazy_committer, |
| MetaServiceCode& code, std::string& msg, const std::string& instance_id, int64_t db_id, |
| std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>& tmp_rowsets_meta, |
| TxnErrorCode& err) { |
| std::stringstream ss; |
| int64_t txn_id = request->txn_id(); |
| do { |
| TEST_SYNC_POINT_CALLBACK("commit_txn_immediately:begin", &txn_id); |
| int64_t last_pending_txn_id = 0; |
| std::unique_ptr<Transaction> txn; |
| err = txn_kv->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| ss << "failed to create txn, txn_id=" << txn_id << " err=" << err; |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| |
| // Get txn info with db_id and txn_id |
| std::string info_val; // Will be reused when saving updated txn |
| const std::string info_key = txn_info_key({instance_id, db_id, txn_id}); |
| err = txn->get(info_key, &info_val); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND |
| : cast_as<ErrCategory::READ>(err); |
| if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| ss << "transaction [" << txn_id << "] not found, db_id=" << db_id; |
| } else { |
| ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << txn_id |
| << " err=" << err; |
| } |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| |
| TxnInfoPB txn_info; |
| if (!txn_info.ParseFromString(info_val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" << txn_id; |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| |
| // TODO: do more check like txn state, 2PC etc. |
| DCHECK(txn_info.txn_id() == txn_id); |
| if (txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) { |
| code = MetaServiceCode::TXN_ALREADY_ABORTED; |
| ss << "transaction [" << txn_id << "] is already aborted, db_id=" << db_id; |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| |
| if (txn_info.status() == TxnStatusPB::TXN_STATUS_VISIBLE) { |
| if (request->has_is_2pc() && request->is_2pc()) { |
| code = MetaServiceCode::TXN_ALREADY_VISIBLE; |
| ss << "transaction [" << txn_id << "] is already visible, not pre-committed."; |
| msg = ss.str(); |
| LOG(INFO) << msg; |
| response->mutable_txn_info()->CopyFrom(txn_info); |
| return; |
| } |
| code = MetaServiceCode::OK; |
| ss << "transaction is already visible: db_id=" << db_id << " txn_id=" << txn_id; |
| msg = ss.str(); |
| LOG(INFO) << msg; |
| response->mutable_txn_info()->CopyFrom(txn_info); |
| return; |
| } |
| |
| if (request->has_is_2pc() && request->is_2pc() && |
| txn_info.status() == TxnStatusPB::TXN_STATUS_PREPARED) { |
| code = MetaServiceCode::TXN_INVALID_STATUS; |
| ss << "transaction is prepare, not pre-committed: db_id=" << db_id << " txn_id" |
| << txn_id; |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| |
| LOG(INFO) << "txn_id=" << txn_id << " txn_info=" << txn_info.ShortDebugString(); |
| |
| // Prepare rowset meta and new_versions |
| // Read tablet indexes in batch. |
| std::vector<std::string> tablet_idx_keys; |
| for (auto& [_, i] : tmp_rowsets_meta) { |
| tablet_idx_keys.push_back(meta_tablet_idx_key({instance_id, i.tablet_id()})); |
| } |
| std::vector<std::optional<std::string>> tablet_idx_values; |
| err = txn->batch_get(&tablet_idx_values, tablet_idx_keys, |
| Transaction::BatchGetOptions(false)); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| ss << "failed to get tablet table index ids, err=" << err; |
| msg = ss.str(); |
| LOG(WARNING) << msg << " txn_id=" << txn_id; |
| return; |
| } |
| |
| size_t total_rowsets = tmp_rowsets_meta.size(); |
| // tablet_id -> {table/index/partition}_id |
| std::unordered_map<int64_t, TabletIndexPB> tablet_ids; |
| // table_id -> tablets_ids |
| std::unordered_map<int64_t, std::vector<int64_t>> table_id_tablet_ids; |
| for (size_t i = 0; i < total_rowsets; i++) { |
| uint64_t tablet_id = tmp_rowsets_meta[i].second.tablet_id(); |
| if (!tablet_idx_values[i].has_value()) [[unlikely]] { |
| // The value must existed |
| code = MetaServiceCode::KV_TXN_GET_ERR; |
| ss << "failed to get tablet table index ids, err=not found" |
| << " tablet_id=" << tablet_id << " key=" << hex(tablet_idx_keys[i]); |
| msg = ss.str(); |
| LOG(WARNING) << msg << " err=" << err << " txn_id=" << txn_id; |
| return; |
| } |
| if (!tablet_ids[tablet_id].ParseFromString(tablet_idx_values[i].value())) [[unlikely]] { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| ss << "malformed tablet index value tablet_id=" << tablet_id |
| << " txn_id=" << txn_id; |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| table_id_tablet_ids[tablet_ids[tablet_id].table_id()].push_back(tablet_id); |
| VLOG_DEBUG << "tablet_id:" << tablet_id |
| << " value:" << tablet_ids[tablet_id].ShortDebugString(); |
| } |
| |
| tablet_idx_keys.clear(); |
| tablet_idx_values.clear(); |
| |
| // {table/partition} -> version |
| std::unordered_map<std::string, uint64_t> new_versions; |
| std::vector<std::string> version_keys; |
| for (auto& [_, i] : tmp_rowsets_meta) { |
| int64_t tablet_id = i.tablet_id(); |
| int64_t table_id = tablet_ids[tablet_id].table_id(); |
| int64_t partition_id = i.partition_id(); |
| std::string ver_key = |
| partition_version_key({instance_id, db_id, table_id, partition_id}); |
| if (new_versions.count(ver_key) == 0) { |
| new_versions.insert({ver_key, 0}); |
| version_keys.push_back(std::move(ver_key)); |
| } |
| } |
| std::vector<std::optional<std::string>> version_values; |
| err = txn->batch_get(&version_values, version_keys); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| ss << "failed to get partition versions, err=" << err; |
| msg = ss.str(); |
| LOG(WARNING) << msg << " txn_id=" << txn_id; |
| return; |
| } |
| size_t total_versions = version_keys.size(); |
| for (size_t i = 0; i < total_versions; i++) { |
| int64_t version; |
| if (version_values[i].has_value()) { |
| VersionPB version_pb; |
| if (!version_pb.ParseFromString(version_values[i].value())) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| ss << "failed to parse version pb txn_id=" << txn_id |
| << " key=" << hex(version_keys[i]); |
| msg = ss.str(); |
| return; |
| } |
| if (version_pb.pending_txn_ids_size() > 0) { |
| DCHECK(version_pb.pending_txn_ids_size() == 1); |
| last_pending_txn_id = version_pb.pending_txn_ids(0); |
| DCHECK(last_pending_txn_id > 0); |
| break; |
| } |
| version = version_pb.version(); |
| } else { |
| version = 1; |
| } |
| new_versions[version_keys[i]] = version + 1; |
| last_pending_txn_id = 0; |
| } |
| version_keys.clear(); |
| version_values.clear(); |
| |
| if (last_pending_txn_id > 0) { |
| txn.reset(); |
| TEST_SYNC_POINT_CALLBACK("commit_txn_immediately::advance_last_pending_txn_id", |
| &last_pending_txn_id); |
| std::shared_ptr<TxnLazyCommitTask> task = |
| txn_lazy_committer->submit(instance_id, last_pending_txn_id); |
| |
| std::tie(code, msg) = task->wait(); |
| if (code != MetaServiceCode::OK) { |
| LOG(WARNING) << "advance_last_txn failed last_txn=" << last_pending_txn_id |
| << " code=" << code << " msg=" << msg; |
| return; |
| } |
| last_pending_txn_id = 0; |
| continue; |
| } |
| |
| std::vector<std::pair<std::string, std::string>> rowsets; |
| std::unordered_map<int64_t, TabletStats> tablet_stats; // tablet_id -> stats |
| rowsets.reserve(tmp_rowsets_meta.size()); |
| for (auto& [_, i] : tmp_rowsets_meta) { |
| int64_t tablet_id = i.tablet_id(); |
| int64_t table_id = tablet_ids[tablet_id].table_id(); |
| int64_t partition_id = i.partition_id(); |
| std::string ver_key = |
| partition_version_key({instance_id, db_id, table_id, partition_id}); |
| if (new_versions[ver_key] == 0) [[unlikely]] { |
| // it is impossible. |
| code = MetaServiceCode::UNDEFINED_ERR; |
| ss << "failed to get partition version key, the target version not exists in " |
| "new_versions." |
| << " txn_id=" << txn_id; |
| msg = ss.str(); |
| LOG(ERROR) << msg; |
| return; |
| } |
| |
| // Update rowset version |
| int64_t new_version = new_versions[ver_key]; |
| i.set_start_version(new_version); |
| i.set_end_version(new_version); |
| |
| std::string key = meta_rowset_key({instance_id, tablet_id, i.end_version()}); |
| std::string val; |
| if (!i.SerializeToString(&val)) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| ss << "failed to serialize rowset_meta, txn_id=" << txn_id; |
| msg = ss.str(); |
| return; |
| } |
| rowsets.emplace_back(std::move(key), std::move(val)); |
| |
| // Accumulate affected rows |
| auto& stats = tablet_stats[tablet_id]; |
| stats.data_size += i.total_disk_size(); |
| stats.num_rows += i.num_rows(); |
| ++stats.num_rowsets; |
| stats.num_segs += i.num_segments(); |
| stats.index_size += i.index_disk_size(); |
| stats.segment_size += i.data_disk_size(); |
| } // for tmp_rowsets_meta |
| |
| process_mow_when_commit_txn(request, instance_id, code, msg, txn, table_id_tablet_ids); |
| if (code != MetaServiceCode::OK) { |
| LOG(WARNING) << "process mow failed, txn_id=" << txn_id << " code=" << code; |
| return; |
| } |
| |
| // Save rowset meta |
| for (auto& i : rowsets) { |
| size_t rowset_size = i.first.size() + i.second.size(); |
| txn->put(i.first, i.second); |
| LOG(INFO) << "xxx put rowset_key=" << hex(i.first) << " txn_id=" << txn_id |
| << " rowset_size=" << rowset_size; |
| } |
| |
| // Save versions |
| int64_t version_update_time_ms = |
| duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); |
| response->set_version_update_time_ms(version_update_time_ms); |
| for (auto& i : new_versions) { |
| std::string ver_val; |
| VersionPB version_pb; |
| version_pb.set_version(i.second); |
| version_pb.set_update_time_ms(version_update_time_ms); |
| if (!version_pb.SerializeToString(&ver_val)) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| ss << "failed to serialize version_pb when saving, txn_id=" << txn_id; |
| msg = ss.str(); |
| return; |
| } |
| |
| txn->put(i.first, ver_val); |
| LOG(INFO) << "xxx put partition_version_key=" << hex(i.first) << " version:" << i.second |
| << " txn_id=" << txn_id << " update_time=" << version_update_time_ms; |
| |
| std::string_view ver_key = i.first; |
| ver_key.remove_prefix(1); // Remove key space |
| // PartitionVersionKeyInfo {instance_id, db_id, table_id, partition_id} |
| std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out; |
| int ret = decode_key(&ver_key, &out); |
| if (ret != 0) [[unlikely]] { |
| // decode version key error means this is something wrong, |
| // we can not continue this txn |
| LOG(WARNING) << "failed to decode key, ret=" << ret << " key=" << hex(ver_key); |
| code = MetaServiceCode::UNDEFINED_ERR; |
| msg = "decode version key error"; |
| return; |
| } |
| |
| int64_t table_id = std::get<int64_t>(std::get<0>(out[4])); |
| int64_t partition_id = std::get<int64_t>(std::get<0>(out[5])); |
| VLOG_DEBUG << " table_id=" << table_id << " partition_id=" << partition_id; |
| |
| response->add_table_ids(table_id); |
| response->add_partition_ids(partition_id); |
| response->add_versions(i.second); |
| } |
| |
| // Save table versions |
| for (auto& i : table_id_tablet_ids) { |
| std::string ver_key = table_version_key({instance_id, db_id, i.first}); |
| txn->atomic_add(ver_key, 1); |
| LOG(INFO) << "xxx atomic add table_version_key=" << hex(ver_key) |
| << " txn_id=" << txn_id; |
| } |
| |
| LOG(INFO) << " before update txn_info=" << txn_info.ShortDebugString(); |
| |
| // Update txn_info |
| txn_info.set_status(TxnStatusPB::TXN_STATUS_VISIBLE); |
| |
| auto now_time = system_clock::now(); |
| uint64_t commit_time = duration_cast<milliseconds>(now_time.time_since_epoch()).count(); |
| if ((txn_info.prepare_time() + txn_info.timeout_ms()) < commit_time) { |
| code = MetaServiceCode::UNDEFINED_ERR; |
| msg = fmt::format("txn is expired, not allow to commit txn_id={}", txn_id); |
| LOG(INFO) << msg << " prepare_time=" << txn_info.prepare_time() |
| << " timeout_ms=" << txn_info.timeout_ms() << " commit_time=" << commit_time; |
| return; |
| } |
| txn_info.set_commit_time(commit_time); |
| txn_info.set_finish_time(commit_time); |
| if (request->has_commit_attachment()) { |
| txn_info.mutable_commit_attachment()->CopyFrom(request->commit_attachment()); |
| } |
| LOG(INFO) << "after update txn_info=" << txn_info.ShortDebugString(); |
| info_val.clear(); |
| if (!txn_info.SerializeToString(&info_val)) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| ss << "failed to serialize txn_info when saving, txn_id=" << txn_id; |
| msg = ss.str(); |
| return; |
| } |
| txn->put(info_key, info_val); |
| LOG(INFO) << "xxx put info_key=" << hex(info_key) << " txn_id=" << txn_id; |
| |
| // Update stats of affected tablet |
| for (auto& [tablet_id, stats] : tablet_stats) { |
| DCHECK(tablet_ids.count(tablet_id)); |
| auto& tablet_idx = tablet_ids[tablet_id]; |
| StatsTabletKeyInfo info {instance_id, tablet_idx.table_id(), tablet_idx.index_id(), |
| tablet_idx.partition_id(), tablet_id}; |
| update_tablet_stats(info, stats, txn, code, msg); |
| if (code != MetaServiceCode::OK) return; |
| } |
| // Remove tmp rowset meta |
| for (auto& [k, _] : tmp_rowsets_meta) { |
| txn->remove(k); |
| LOG(INFO) << "xxx remove tmp_rowset_key=" << hex(k) << " txn_id=" << txn_id; |
| } |
| |
| const std::string running_key = txn_running_key({instance_id, db_id, txn_id}); |
| LOG(INFO) << "xxx remove running_key=" << hex(running_key) << " txn_id=" << txn_id; |
| txn->remove(running_key); |
| |
| std::string recycle_val; |
| std::string recycle_key = recycle_txn_key({instance_id, db_id, txn_id}); |
| RecycleTxnPB recycle_pb; |
| recycle_pb.set_creation_time(commit_time); |
| recycle_pb.set_label(txn_info.label()); |
| |
| if (!recycle_pb.SerializeToString(&recycle_val)) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| ss << "failed to serialize recycle_pb, txn_id=" << txn_id; |
| msg = ss.str(); |
| return; |
| } |
| txn->put(recycle_key, recycle_val); |
| |
| if (txn_info.load_job_source_type() == |
| LoadJobSourceTypePB::LOAD_JOB_SRC_TYPE_ROUTINE_LOAD_TASK) { |
| put_routine_load_progress(code, msg, instance_id, request, txn.get(), db_id); |
| } |
| |
| LOG(INFO) << "xxx commit_txn put recycle_key key=" << hex(recycle_key) |
| << " txn_id=" << txn_id; |
| LOG(INFO) << "commit_txn put_size=" << txn->put_bytes() |
| << " del_size=" << txn->delete_bytes() << " num_put_keys=" << txn->num_put_keys() |
| << " num_del_keys=" << txn->num_del_keys() |
| << " txn_size=" << txn->approximate_bytes() << " txn_id=" << txn_id; |
| |
| TEST_SYNC_POINT_RETURN_WITH_VOID("commit_txn_immediately::before_commit", &err, &code); |
| // Finally we are done... |
| err = txn->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::COMMIT>(err); |
| ss << "failed to commit kv txn, txn_id=" << txn_id << " err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| |
| // calculate table stats from tablets stats |
| std::map<int64_t /*table_id*/, TableStats> table_stats; |
| std::vector<int64_t> base_tablet_ids(request->base_tablet_ids().begin(), |
| request->base_tablet_ids().end()); |
| calc_table_stats(tablet_ids, tablet_stats, table_stats, base_tablet_ids); |
| for (const auto& pair : table_stats) { |
| TableStatsPB* stats_pb = response->add_table_stats(); |
| auto table_id = pair.first; |
| auto stats = pair.second; |
| get_pb_from_tablestats(stats, stats_pb); |
| stats_pb->set_table_id(table_id); |
| VLOG_DEBUG << "Add TableStats to CommitTxnResponse. txn_id=" << txn_id |
| << " table_id=" << table_id |
| << " updated_row_count=" << stats_pb->updated_row_count(); |
| } |
| response->mutable_txn_info()->CopyFrom(txn_info); |
| TEST_SYNC_POINT_CALLBACK("commit_txn_immediately::finish", &code); |
| break; |
| } while (true); |
| } // end commit_txn_immediately |
| |
| void get_tablet_indexes( |
| const std::string& instance_id, int64_t txn_id, |
| const std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>& tmp_rowsets_meta, |
| std::unique_ptr<Transaction>& txn, MetaServiceCode& code, std::string& msg, |
| std::unordered_map<int64_t, TabletIndexPB>* tablet_ids, |
| std::unordered_map<int64_t, std::vector<int64_t>>* table_id_tablet_ids, |
| bool* need_repair_tablet_idx) { |
| // Read tablet indexes in batch. |
| std::stringstream ss; |
| std::vector<std::string> tablet_idx_keys; |
| for (auto& [_, i] : tmp_rowsets_meta) { |
| tablet_idx_keys.push_back(meta_tablet_idx_key({instance_id, i.tablet_id()})); |
| } |
| std::vector<std::optional<std::string>> tablet_idx_values; |
| TxnErrorCode err = |
| txn->batch_get(&tablet_idx_values, tablet_idx_keys, Transaction::BatchGetOptions(true)); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| ss << "failed to get tablet table index ids, err=" << err; |
| msg = ss.str(); |
| LOG(WARNING) << msg << " txn_id=" << txn_id; |
| return; |
| } |
| |
| for (size_t i = 0; i < tmp_rowsets_meta.size(); i++) { |
| uint64_t tablet_id = tmp_rowsets_meta[i].second.tablet_id(); |
| if (!tablet_idx_values[i].has_value()) [[unlikely]] { |
| // The value must existed |
| code = MetaServiceCode::KV_TXN_GET_ERR; |
| ss << "failed to get tablet table index ids, err=not found" |
| << " tablet_id=" << tablet_id << " key=" << hex(tablet_idx_keys[i]); |
| msg = ss.str(); |
| LOG(WARNING) << msg << " err=" << err << " txn_id=" << txn_id; |
| return; |
| } |
| if (!(*tablet_ids)[tablet_id].ParseFromString(tablet_idx_values[i].value())) [[unlikely]] { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| ss << "malformed tablet index value tablet_id=" << tablet_id << " txn_id=" << txn_id; |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| if (!(*tablet_ids)[tablet_id].has_db_id()) { |
| *need_repair_tablet_idx = true; |
| } |
| (*table_id_tablet_ids)[(*tablet_ids)[tablet_id].table_id()].push_back(tablet_id); |
| VLOG_DEBUG << "tablet_id:" << tablet_id |
| << " value:" << (*tablet_ids)[tablet_id].ShortDebugString(); |
| } |
| |
| tablet_idx_keys.clear(); |
| tablet_idx_values.clear(); |
| } |
| |
| // rewrite TabletIndexPB for fill db_id, in case of historical reasons |
| // TabletIndexPB missing db_id |
| void repair_tablet_index( |
| std::shared_ptr<TxnKv>& txn_kv, MetaServiceCode& code, std::string& msg, |
| const std::string& instance_id, int64_t db_id, int64_t txn_id, |
| const std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>& tmp_rowsets_meta) { |
| std::stringstream ss; |
| std::vector<std::string> tablet_idx_keys; |
| for (auto& [_, i] : tmp_rowsets_meta) { |
| tablet_idx_keys.push_back(meta_tablet_idx_key({instance_id, i.tablet_id()})); |
| } |
| |
| for (size_t i = 0; i < tablet_idx_keys.size(); i += config::max_tablet_index_num_per_batch) { |
| size_t end = (i + config::max_tablet_index_num_per_batch) > tablet_idx_keys.size() |
| ? tablet_idx_keys.size() |
| : i + config::max_tablet_index_num_per_batch; |
| const std::vector<std::string> sub_tablet_idx_keys(tablet_idx_keys.begin() + i, |
| tablet_idx_keys.begin() + end); |
| |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| ss << "failed to create txn, txn_id=" << txn_id << " err=" << err; |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| |
| std::vector<std::optional<std::string>> tablet_idx_values; |
| // batch get snapshot is false |
| err = txn->batch_get(&tablet_idx_values, sub_tablet_idx_keys, |
| Transaction::BatchGetOptions(false)); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| ss << "failed to get tablet table index ids, err=" << err; |
| msg = ss.str(); |
| LOG(WARNING) << msg << " txn_id=" << txn_id; |
| return; |
| } |
| DCHECK(tablet_idx_values.size() <= config::max_tablet_index_num_per_batch); |
| |
| for (size_t j = 0; j < sub_tablet_idx_keys.size(); j++) { |
| if (!tablet_idx_values[j].has_value()) [[unlikely]] { |
| // The value must existed |
| code = MetaServiceCode::KV_TXN_GET_ERR; |
| ss << "failed to get tablet table index ids, err=not found" |
| << " key=" << hex(tablet_idx_keys[j]); |
| msg = ss.str(); |
| LOG(WARNING) << msg << " err=" << err << " txn_id=" << txn_id; |
| return; |
| } |
| TabletIndexPB tablet_idx_pb; |
| if (!tablet_idx_pb.ParseFromString(tablet_idx_values[j].value())) [[unlikely]] { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| ss << "malformed tablet index value key=" << hex(tablet_idx_keys[j]) |
| << " txn_id=" << txn_id; |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| |
| if (!tablet_idx_pb.has_db_id()) { |
| tablet_idx_pb.set_db_id(db_id); |
| std::string idx_val; |
| if (!tablet_idx_pb.SerializeToString(&idx_val)) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| ss << "failed to serialize tablet index value key=" << hex(tablet_idx_keys[j]) |
| << " txn_id=" << txn_id; |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| txn->put(sub_tablet_idx_keys[j], idx_val); |
| LOG(INFO) << " repaire tablet index txn_id=" << txn_id |
| << " tablet_idx_pb:" << tablet_idx_pb.ShortDebugString() |
| << " key=" << hex(sub_tablet_idx_keys[j]); |
| } |
| } |
| |
| err = txn->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::COMMIT>(err); |
| ss << "failed to commit kv txn, txn_id=" << txn_id << " err=" << err; |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| } |
| code = MetaServiceCode::OK; |
| } |
| |
| void commit_txn_eventually( |
| const CommitTxnRequest* request, CommitTxnResponse* response, |
| std::shared_ptr<TxnKv>& txn_kv, std::shared_ptr<TxnLazyCommitter>& txn_lazy_committer, |
| MetaServiceCode& code, std::string& msg, const std::string& instance_id, int64_t db_id, |
| const std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>& tmp_rowsets_meta) { |
| StopWatch sw; |
| std::unique_ptr<int, std::function<void(int*)>> defer_status((int*)0x01, [&](int*) { |
| if (config::use_detailed_metrics && !instance_id.empty()) { |
| g_bvar_ms_commit_txn_eventually.put(instance_id, sw.elapsed_us()); |
| } |
| }); |
| |
| std::stringstream ss; |
| TxnErrorCode err = TxnErrorCode::TXN_OK; |
| int64_t txn_id = request->txn_id(); |
| |
| do { |
| TEST_SYNC_POINT_CALLBACK("commit_txn_eventually:begin", &txn_id); |
| int64_t last_pending_txn_id = 0; |
| std::unique_ptr<Transaction> txn; |
| err = txn_kv->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| ss << "failed to create txn, txn_id=" << txn_id << " err=" << err; |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| |
| // tablet_id -> {table/index/partition}_id |
| std::unordered_map<int64_t, TabletIndexPB> tablet_ids; |
| // table_id -> tablets_ids |
| std::unordered_map<int64_t, std::vector<int64_t>> table_id_tablet_ids; |
| bool need_repair_tablet_idx = false; |
| get_tablet_indexes(instance_id, txn_id, tmp_rowsets_meta, txn, code, msg, &tablet_ids, |
| &table_id_tablet_ids, &need_repair_tablet_idx); |
| if (code != MetaServiceCode::OK) { |
| LOG(WARNING) << "get_tablet_indexes failed, txn_id=" << txn_id << " code=" << code; |
| return; |
| } |
| |
| TEST_SYNC_POINT_CALLBACK("commit_txn_eventually::need_repair_tablet_idx", |
| &need_repair_tablet_idx); |
| if (need_repair_tablet_idx) { |
| txn.reset(); |
| repair_tablet_index(txn_kv, code, msg, instance_id, db_id, txn_id, tmp_rowsets_meta); |
| if (code != MetaServiceCode::OK) { |
| LOG(WARNING) << "repair_tablet_index failed, txn_id=" << txn_id << " code=" << code; |
| return; |
| } |
| continue; |
| } |
| |
| // <partition_version_key, version> |
| std::unordered_map<std::string, uint64_t> new_versions; |
| std::vector<std::string> version_keys; |
| for (auto& [_, i] : tmp_rowsets_meta) { |
| int64_t tablet_id = i.tablet_id(); |
| int64_t table_id = tablet_ids[tablet_id].table_id(); |
| int64_t partition_id = i.partition_id(); |
| std::string ver_key = |
| partition_version_key({instance_id, db_id, table_id, partition_id}); |
| if (new_versions.count(ver_key) == 0) { |
| new_versions.insert({ver_key, 0}); |
| version_keys.push_back(std::move(ver_key)); |
| } |
| } |
| |
| std::vector<std::optional<std::string>> version_values; |
| err = txn->batch_get(&version_values, version_keys); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| ss << "failed to get partition versions, err=" << err; |
| msg = ss.str(); |
| LOG(WARNING) << msg << " txn_id=" << txn_id; |
| return; |
| } |
| |
| for (size_t i = 0; i < version_keys.size(); i++) { |
| int64_t version; |
| if (version_values[i].has_value()) { |
| VersionPB version_pb; |
| if (!version_pb.ParseFromString(version_values[i].value())) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| ss << "failed to parse version pb txn_id=" << txn_id |
| << " key=" << hex(version_keys[i]); |
| msg = ss.str(); |
| return; |
| } |
| if (version_pb.pending_txn_ids_size() > 0) { |
| DCHECK(version_pb.pending_txn_ids_size() == 1); |
| last_pending_txn_id = version_pb.pending_txn_ids(0); |
| DCHECK(last_pending_txn_id > 0); |
| break; |
| } |
| version = version_pb.version(); |
| } else { |
| version = 1; |
| } |
| new_versions[version_keys[i]] = version; |
| last_pending_txn_id = 0; |
| } |
| TEST_SYNC_POINT_CALLBACK("commit_txn_eventually::last_pending_txn_id", |
| &last_pending_txn_id); |
| |
| if (last_pending_txn_id > 0) { |
| TEST_SYNC_POINT_CALLBACK("commit_txn_eventually::advance_last_pending_txn_id", |
| &last_pending_txn_id); |
| txn.reset(); |
| std::shared_ptr<TxnLazyCommitTask> task = |
| txn_lazy_committer->submit(instance_id, last_pending_txn_id); |
| |
| std::tie(code, msg) = task->wait(); |
| if (code != MetaServiceCode::OK) { |
| LOG(WARNING) << "advance_last_txn failed last_txn=" << last_pending_txn_id |
| << " code=" << code << " msg=" << msg; |
| return; |
| } |
| |
| last_pending_txn_id = 0; |
| // there maybe concurrent commit_txn_eventually, so we need continue to make sure |
| // partition versionPB has no txn_id |
| continue; |
| } |
| |
| std::string info_val; |
| const std::string info_key = txn_info_key({instance_id, db_id, txn_id}); |
| err = txn->get(info_key, &info_val); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND |
| : cast_as<ErrCategory::READ>(err); |
| ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << txn_id |
| << " err=" << err; |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| |
| TxnInfoPB txn_info; |
| if (!txn_info.ParseFromString(info_val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" << txn_id; |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| LOG(INFO) << "txn_id=" << txn_id << " txn_info=" << txn_info.ShortDebugString(); |
| |
| DCHECK(txn_info.txn_id() == txn_id); |
| if (txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) { |
| code = MetaServiceCode::TXN_ALREADY_ABORTED; |
| ss << "transaction [" << txn_id << "] is already aborted, db_id=" << db_id; |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| |
| if (txn_info.status() == TxnStatusPB::TXN_STATUS_VISIBLE) { |
| if (request->has_is_2pc() && request->is_2pc()) { |
| code = MetaServiceCode::TXN_ALREADY_VISIBLE; |
| ss << "transaction [" << txn_id << "] is already visible, not pre-committed."; |
| msg = ss.str(); |
| LOG(INFO) << msg; |
| response->mutable_txn_info()->CopyFrom(txn_info); |
| return; |
| } |
| code = MetaServiceCode::OK; |
| ss << "transaction is already visible: db_id=" << db_id << " txn_id=" << txn_id; |
| msg = ss.str(); |
| LOG(INFO) << msg; |
| response->mutable_txn_info()->CopyFrom(txn_info); |
| return; |
| } |
| |
| if (request->has_is_2pc() && request->is_2pc() && |
| txn_info.status() == TxnStatusPB::TXN_STATUS_PREPARED) { |
| code = MetaServiceCode::TXN_INVALID_STATUS; |
| ss << "transaction is prepare, not pre-committed: db_id=" << db_id << " txn_id" |
| << txn_id; |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| |
| auto now_time = system_clock::now(); |
| uint64_t commit_time = duration_cast<milliseconds>(now_time.time_since_epoch()).count(); |
| if ((txn_info.prepare_time() + txn_info.timeout_ms()) < commit_time) { |
| code = MetaServiceCode::UNDEFINED_ERR; |
| msg = fmt::format("txn is expired, not allow to commit txn_id={}", txn_id); |
| LOG(INFO) << msg << " prepare_time=" << txn_info.prepare_time() |
| << " timeout_ms=" << txn_info.timeout_ms() << " commit_time=" << commit_time; |
| return; |
| } |
| txn_info.set_commit_time(commit_time); |
| txn_info.set_finish_time(commit_time); |
| if (request->has_commit_attachment()) { |
| txn_info.mutable_commit_attachment()->CopyFrom(request->commit_attachment()); |
| } |
| DCHECK(txn_info.status() != TxnStatusPB::TXN_STATUS_COMMITTED); |
| // set status TXN_STATUS_COMMITTED not TXN_STATUS_VISIBLE !!! |
| // lazy commit task will advance txn to make txn visible |
| txn_info.set_status(TxnStatusPB::TXN_STATUS_COMMITTED); |
| |
| LOG(INFO) << "after update txn_id= " << txn_id |
| << " txn_info=" << txn_info.ShortDebugString(); |
| info_val.clear(); |
| if (!txn_info.SerializeToString(&info_val)) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| ss << "failed to serialize txn_info when saving, txn_id=" << txn_id; |
| msg = ss.str(); |
| return; |
| } |
| |
| txn->put(info_key, info_val); |
| LOG(INFO) << "put info_key=" << hex(info_key) << " txn_id=" << txn_id; |
| |
| if (txn_info.load_job_source_type() == |
| LoadJobSourceTypePB::LOAD_JOB_SRC_TYPE_ROUTINE_LOAD_TASK) { |
| put_routine_load_progress(code, msg, instance_id, request, txn.get(), db_id); |
| } |
| |
| // save versions for partition |
| int64_t version_update_time_ms = |
| duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); |
| response->set_version_update_time_ms(version_update_time_ms); |
| for (auto& i : new_versions) { |
| std::string ver_val; |
| VersionPB version_pb; |
| version_pb.add_pending_txn_ids(txn_id); |
| version_pb.set_update_time_ms(version_update_time_ms); |
| if (i.second > 1) { |
| version_pb.set_version(i.second); |
| } |
| |
| if (!version_pb.SerializeToString(&ver_val)) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| ss << "failed to serialize version_pb when saving, txn_id=" << txn_id |
| << " partiton_key=" << hex(i.first); |
| msg = ss.str(); |
| return; |
| } |
| |
| txn->put(i.first, ver_val); |
| LOG(INFO) << "put partition_version_key=" << hex(i.first) << " version:" << i.second |
| << " txn_id=" << txn_id << " update_time=" << version_update_time_ms; |
| |
| std::string_view ver_key = i.first; |
| ver_key.remove_prefix(1); // Remove key space |
| // PartitionVersionKeyInfo {instance_id, db_id, table_id, partition_id} |
| std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out; |
| int ret = decode_key(&ver_key, &out); |
| if (ret != 0) [[unlikely]] { |
| // decode version key error means this is something wrong, |
| // we can not continue this txn |
| LOG(WARNING) << "failed to decode key, ret=" << ret << " key=" << hex(ver_key); |
| code = MetaServiceCode::UNDEFINED_ERR; |
| msg = "decode version key error"; |
| return; |
| } |
| |
| int64_t table_id = std::get<int64_t>(std::get<0>(out[4])); |
| int64_t partition_id = std::get<int64_t>(std::get<0>(out[5])); |
| VLOG_DEBUG << "txn_id=" << txn_id << " table_id=" << table_id |
| << " partition_id=" << partition_id << " version=" << i.second; |
| |
| response->add_table_ids(table_id); |
| response->add_partition_ids(partition_id); |
| response->add_versions(i.second + 1); |
| } |
| |
| process_mow_when_commit_txn(request, instance_id, code, msg, txn, table_id_tablet_ids); |
| if (code != MetaServiceCode::OK) { |
| LOG(WARNING) << "process mow failed, txn_id=" << txn_id << " code=" << code; |
| return; |
| } |
| |
| // Save table versions |
| for (auto& i : table_id_tablet_ids) { |
| std::string ver_key = table_version_key({instance_id, db_id, i.first}); |
| txn->atomic_add(ver_key, 1); |
| LOG(INFO) << "xxx atomic add table_version_key=" << hex(ver_key) |
| << " txn_id=" << txn_id; |
| } |
| |
| VLOG_DEBUG << "put_size=" << txn->put_bytes() << " del_size=" << txn->delete_bytes() |
| << " num_put_keys=" << txn->num_put_keys() |
| << " num_del_keys=" << txn->num_del_keys() |
| << " txn_size=" << txn->approximate_bytes() << " txn_id=" << txn_id; |
| |
| err = txn->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::COMMIT>(err); |
| ss << "failed to commit kv txn, txn_id=" << txn_id << " err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| |
| TEST_SYNC_POINT_RETURN_WITH_VOID("commit_txn_eventually::txn_lazy_committer_submit", |
| &txn_id); |
| std::shared_ptr<TxnLazyCommitTask> task = txn_lazy_committer->submit(instance_id, txn_id); |
| TEST_SYNC_POINT_CALLBACK("commit_txn_eventually::txn_lazy_committer_wait", &txn_id); |
| std::pair<MetaServiceCode, std::string> ret = task->wait(); |
| if (ret.first != MetaServiceCode::OK) { |
| LOG(WARNING) << "txn lazy commit failed txn_id=" << txn_id << " code=" << ret.first |
| << " msg=" << ret.second; |
| } |
| |
| std::unordered_map<int64_t, TabletStats> tablet_stats; // tablet_id -> stats |
| for (auto& [_, i] : tmp_rowsets_meta) { |
| // Accumulate affected rows |
| auto& stats = tablet_stats[i.tablet_id()]; |
| stats.data_size += i.total_disk_size(); |
| stats.num_rows += i.num_rows(); |
| ++stats.num_rowsets; |
| stats.num_segs += i.num_segments(); |
| stats.index_size += i.index_disk_size(); |
| stats.segment_size += i.data_disk_size(); |
| } |
| |
| // calculate table stats from tablets stats |
| std::map<int64_t /*table_id*/, TableStats> table_stats; |
| std::vector<int64_t> base_tablet_ids(request->base_tablet_ids().begin(), |
| request->base_tablet_ids().end()); |
| calc_table_stats(tablet_ids, tablet_stats, table_stats, base_tablet_ids); |
| for (const auto& pair : table_stats) { |
| TableStatsPB* stats_pb = response->add_table_stats(); |
| auto table_id = pair.first; |
| auto stats = pair.second; |
| get_pb_from_tablestats(stats, stats_pb); |
| stats_pb->set_table_id(table_id); |
| VLOG_DEBUG << "Add TableStats to CommitTxnResponse. txn_id=" << txn_id |
| << " table_id=" << table_id |
| << " updated_row_count=" << stats_pb->updated_row_count(); |
| } |
| |
| // txn set visible for fe callback |
| txn_info.set_status(TxnStatusPB::TXN_STATUS_VISIBLE); |
| response->mutable_txn_info()->CopyFrom(txn_info); |
| TEST_SYNC_POINT_CALLBACK("commit_txn_eventually::finish", &code, &txn_id); |
| break; |
| } while (true); |
| } |
| |
| /** |
| * This process is generally the same as commit_txn, the difference is that |
| * the partitions version will plus 1 in multi sub txns. |
| * |
| * One example: |
| * Suppose the table, partition, tablet and version info is: |
| * -------------------------------------------- |
| * | table | partition | tablet | version | |
| * -------------------------------------------- |
| * | t1 | t1_p1 | t1_p1.1 | 1 | |
| * | t1 | t1_p1 | t1_p1.2 | 1 | |
| * | t1 | t1_p2 | t1_p2.1 | 2 | |
| * | t2 | t2_p3 | t2_p3.1 | 3 | |
| * | t2 | t2_p4 | t2_p4.1 | 4 | |
| * -------------------------------------------- |
| * |
| * Now we commit a txn with 3 sub txns and the tablets are: |
| * sub_txn1: t1_p1.1, t1_p1.2, t1_p2.1 |
| * sub_txn2: t2_p3.1 |
| * sub_txn3: t1_p1.1, t1_p1.2 |
| * When commit, the partitions version will be: |
| * sub_txn1: t1_p1(1 -> 2), t1_p2(2 -> 3) |
| * sub_txn2: t2_p3(3 -> 4) |
| * sub_txn3: t1_p1(2 -> 3) |
| * After commit, the partitions version will be: |
| * t1: t1_p1(3), t1_p2(3) |
| * t2: t2_p3(4), t2_p4(4) |
| */ |
| void commit_txn_with_sub_txn(const CommitTxnRequest* request, CommitTxnResponse* response, |
| std::shared_ptr<TxnKv>& txn_kv, MetaServiceCode& code, |
| std::string& msg, const std::string& instance_id) { |
| std::stringstream ss; |
| int64_t txn_id = request->txn_id(); |
| auto sub_txn_infos = request->sub_txn_infos(); |
| // Create a readonly txn for scan tmp rowset |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| ss << "filed to create txn, txn_id=" << txn_id << " err=" << err; |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| |
| // Get db id with txn id |
| std::string index_val; |
| const std::string index_key = txn_index_key({instance_id, txn_id}); |
| err = txn->get(index_key, &index_val); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| ss << "failed to get db id, txn_id=" << txn_id << " err=" << err; |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| |
| TxnIndexPB index_pb; |
| if (!index_pb.ParseFromString(index_val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| ss << "failed to parse txn_index_pb, txn_id=" << txn_id; |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| |
| DCHECK(index_pb.has_tablet_index() == true); |
| DCHECK(index_pb.tablet_index().has_db_id() == true); |
| int64_t db_id = index_pb.tablet_index().db_id(); |
| |
| // Get temporary rowsets involved in the txn |
| std::map<int64_t, std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>> |
| sub_txn_to_tmp_rowsets_meta; |
| for (const auto& sub_txn_info : sub_txn_infos) { |
| auto sub_txn_id = sub_txn_info.sub_txn_id(); |
| // This is a range scan |
| MetaRowsetTmpKeyInfo rs_tmp_key_info0 {instance_id, sub_txn_id, 0}; |
| MetaRowsetTmpKeyInfo rs_tmp_key_info1 {instance_id, sub_txn_id + 1, 0}; |
| std::string rs_tmp_key0; |
| std::string rs_tmp_key1; |
| meta_rowset_tmp_key(rs_tmp_key_info0, &rs_tmp_key0); |
| meta_rowset_tmp_key(rs_tmp_key_info1, &rs_tmp_key1); |
| // Get rowset meta that should be commited |
| // tmp_rowset_key -> rowset_meta |
| std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>> tmp_rowsets_meta; |
| |
| int num_rowsets = 0; |
| std::unique_ptr<int, std::function<void(int*)>> defer_log_range( |
| (int*)0x01, [rs_tmp_key0, rs_tmp_key1, &num_rowsets, &txn_id, &sub_txn_id](int*) { |
| LOG(INFO) << "get tmp rowset meta, txn_id=" << txn_id |
| << ", sub_txn_id=" << sub_txn_id << " num_rowsets=" << num_rowsets |
| << " range=[" << hex(rs_tmp_key0) << "," << hex(rs_tmp_key1) << ")"; |
| }); |
| |
| std::unique_ptr<RangeGetIterator> it; |
| do { |
| err = txn->get(rs_tmp_key0, rs_tmp_key1, &it, true); |
| if (err == TxnErrorCode::TXN_TOO_OLD) { |
| err = txn_kv->create_txn(&txn); |
| if (err == TxnErrorCode::TXN_OK) { |
| err = txn->get(rs_tmp_key0, rs_tmp_key1, &it, true); |
| } |
| } |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| ss << "internal error, failed to get tmp rowset while committing, txn_id=" << txn_id |
| << " err=" << err; |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| |
| while (it->has_next()) { |
| auto [k, v] = it->next(); |
| LOG(INFO) << "range_get rowset_tmp_key=" << hex(k) << " txn_id=" << txn_id; |
| tmp_rowsets_meta.emplace_back(); |
| if (!tmp_rowsets_meta.back().second.ParseFromArray(v.data(), v.size())) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| ss << "malformed rowset meta, unable to initialize, txn_id=" << txn_id |
| << " key=" << hex(k); |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| // Save keys that will be removed later |
| tmp_rowsets_meta.back().first = std::string(k.data(), k.size()); |
| ++num_rowsets; |
| if (!it->has_next()) rs_tmp_key0 = k; |
| } |
| rs_tmp_key0.push_back('\x00'); // Update to next smallest key for iteration |
| } while (it->more()); |
| |
| VLOG_DEBUG << "txn_id=" << txn_id << " sub_txn_id=" << sub_txn_id |
| << " tmp_rowsets_meta.size()=" << tmp_rowsets_meta.size(); |
| sub_txn_to_tmp_rowsets_meta.emplace(sub_txn_id, std::move(tmp_rowsets_meta)); |
| } |
| |
| // Create a read/write txn for guarantee consistency |
| txn.reset(); |
| err = txn_kv->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| ss << "filed to create txn, txn_id=" << txn_id << " err=" << err; |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| |
| // Get txn info with db_id and txn_id |
| std::string info_val; // Will be reused when saving updated txn |
| const std::string info_key = txn_info_key({instance_id, db_id, txn_id}); |
| err = txn->get(info_key, &info_val); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND |
| : cast_as<ErrCategory::READ>(err); |
| ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << txn_id << " err=" << err; |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| |
| TxnInfoPB txn_info; |
| if (!txn_info.ParseFromString(info_val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" << txn_id; |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| |
| // TODO: do more check like txn state |
| DCHECK(txn_info.txn_id() == txn_id); |
| if (txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) { |
| code = MetaServiceCode::TXN_ALREADY_ABORTED; |
| ss << "transaction is already aborted: db_id=" << db_id << " txn_id=" << txn_id; |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| |
| if (txn_info.status() == TxnStatusPB::TXN_STATUS_VISIBLE) { |
| code = MetaServiceCode::OK; |
| ss << "transaction is already visible: db_id=" << db_id << " txn_id=" << txn_id; |
| msg = ss.str(); |
| LOG(INFO) << msg; |
| response->mutable_txn_info()->CopyFrom(txn_info); |
| return; |
| } |
| |
| LOG(INFO) << "txn_id=" << txn_id << " txn_info=" << txn_info.ShortDebugString(); |
| |
| // Prepare rowset meta and new_versions |
| // Read tablet indexes in batch. |
| std::map<int64_t, int64_t> tablet_id_to_idx; |
| std::vector<std::string> tablet_idx_keys; |
| std::vector<int64_t> partition_ids; |
| auto idx = 0; |
| for (auto& [_, tmp_rowsets_meta] : sub_txn_to_tmp_rowsets_meta) { |
| for (auto& [_, i] : tmp_rowsets_meta) { |
| auto tablet_id = i.tablet_id(); |
| if (tablet_id_to_idx.count(tablet_id) == 0) { |
| tablet_id_to_idx.emplace(tablet_id, idx); |
| tablet_idx_keys.push_back(meta_tablet_idx_key({instance_id, i.tablet_id()})); |
| partition_ids.push_back(i.partition_id()); |
| idx++; |
| } |
| } |
| } |
| std::vector<std::optional<std::string>> tablet_idx_values; |
| err = txn->batch_get(&tablet_idx_values, tablet_idx_keys, Transaction::BatchGetOptions(false)); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| ss << "failed to get tablet table index ids, err=" << err; |
| msg = ss.str(); |
| LOG(WARNING) << msg << " txn_id=" << txn_id; |
| return; |
| } |
| |
| // tablet_id -> {table/index/partition}_id |
| std::unordered_map<int64_t, TabletIndexPB> tablet_ids; |
| // table_id -> tablets_ids |
| std::unordered_map<int64_t, std::vector<int64_t>> table_id_tablet_ids; |
| for (auto [tablet_id, i] : tablet_id_to_idx) { |
| if (!tablet_idx_values[i].has_value()) [[unlikely]] { |
| // The value must existed |
| code = MetaServiceCode::KV_TXN_GET_ERR; |
| ss << "failed to get tablet table index ids, err=not found" |
| << " tablet_id=" << tablet_id << " key=" << hex(tablet_idx_keys[i]); |
| msg = ss.str(); |
| LOG(WARNING) << msg << " err=" << err << " txn_id=" << txn_id; |
| return; |
| } |
| if (!tablet_ids[tablet_id].ParseFromString(tablet_idx_values[i].value())) [[unlikely]] { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| ss << "malformed tablet index value tablet_id=" << tablet_id << " txn_id=" << txn_id; |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| table_id_tablet_ids[tablet_ids[tablet_id].table_id()].push_back(tablet_id); |
| VLOG_DEBUG << "tablet_id:" << tablet_id |
| << " value:" << tablet_ids[tablet_id].ShortDebugString(); |
| } |
| |
| tablet_idx_keys.clear(); |
| tablet_idx_values.clear(); |
| |
| // {table/partition} -> version |
| std::unordered_map<std::string, uint64_t> new_versions; |
| std::vector<std::string> version_keys; |
| for (auto& [tablet_id, i] : tablet_id_to_idx) { |
| int64_t table_id = tablet_ids[tablet_id].table_id(); |
| int64_t partition_id = partition_ids[i]; |
| std::string ver_key = partition_version_key({instance_id, db_id, table_id, partition_id}); |
| if (new_versions.count(ver_key) == 0) { |
| new_versions.insert({ver_key, 0}); |
| LOG(INFO) << "xxx add a partition_version_key=" << hex(ver_key) << " txn_id=" << txn_id |
| << ", db_id=" << db_id << ", table_id=" << table_id |
| << ", partition_id=" << partition_id; |
| version_keys.push_back(std::move(ver_key)); |
| } |
| } |
| std::vector<std::optional<std::string>> version_values; |
| err = txn->batch_get(&version_values, version_keys); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| ss << "failed to get partition versions, err=" << err; |
| msg = ss.str(); |
| LOG(WARNING) << msg << " txn_id=" << txn_id; |
| return; |
| } |
| size_t total_versions = version_keys.size(); |
| for (size_t i = 0; i < total_versions; i++) { |
| int64_t version; |
| if (version_values[i].has_value()) { |
| VersionPB version_pb; |
| if (!version_pb.ParseFromString(version_values[i].value())) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| ss << "failed to parse version pb txn_id=" << txn_id |
| << " key=" << hex(version_keys[i]); |
| msg = ss.str(); |
| return; |
| } |
| version = version_pb.version(); |
| } else { |
| version = 1; |
| } |
| new_versions[version_keys[i]] = version; |
| LOG(INFO) << "xxx get partition_version_key=" << hex(version_keys[i]) |
| << " version:" << version << " txn_id=" << txn_id; |
| } |
| version_keys.clear(); |
| version_values.clear(); |
| |
| std::vector<std::pair<std::string, std::string>> rowsets; |
| std::unordered_map<int64_t, TabletStats> tablet_stats; // tablet_id -> stats |
| for (const auto& sub_txn_info : sub_txn_infos) { |
| auto sub_txn_id = sub_txn_info.sub_txn_id(); |
| auto tmp_rowsets_meta = sub_txn_to_tmp_rowsets_meta[sub_txn_id]; |
| std::unordered_map<int64_t, int64_t> partition_id_to_version; |
| for (auto& [_, i] : tmp_rowsets_meta) { |
| int64_t tablet_id = i.tablet_id(); |
| int64_t table_id = tablet_ids[tablet_id].table_id(); |
| int64_t partition_id = i.partition_id(); |
| std::string ver_key = |
| partition_version_key({instance_id, db_id, table_id, partition_id}); |
| if (new_versions.count(ver_key) == 0) [[unlikely]] { |
| // it is impossible. |
| code = MetaServiceCode::UNDEFINED_ERR; |
| ss << "failed to get partition version key, the target version not exists in " |
| "new_versions." |
| << " txn_id=" << txn_id << ", db_id=" << db_id << ", table_id=" << table_id |
| << ", partition_id=" << partition_id; |
| msg = ss.str(); |
| LOG(ERROR) << msg; |
| return; |
| } |
| |
| // Update rowset version |
| int64_t new_version = new_versions[ver_key]; |
| if (partition_id_to_version.count(partition_id) == 0) { |
| new_versions[ver_key] = new_version + 1; |
| new_version = new_versions[ver_key]; |
| partition_id_to_version[partition_id] = new_version; |
| } |
| i.set_start_version(new_version); |
| i.set_end_version(new_version); |
| LOG(INFO) << "xxx update rowset version, txn_id=" << txn_id |
| << ", sub_txn_id=" << sub_txn_id << ", table_id=" << table_id |
| << ", partition_id=" << partition_id << ", tablet_id=" << tablet_id |
| << ", new_version=" << new_version; |
| |
| std::string key = meta_rowset_key({instance_id, tablet_id, i.end_version()}); |
| std::string val; |
| if (!i.SerializeToString(&val)) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| ss << "failed to serialize rowset_meta, txn_id=" << txn_id; |
| msg = ss.str(); |
| return; |
| } |
| rowsets.emplace_back(std::move(key), std::move(val)); |
| |
| // Accumulate affected rows |
| auto& stats = tablet_stats[tablet_id]; |
| stats.data_size += i.total_disk_size(); |
| stats.num_rows += i.num_rows(); |
| ++stats.num_rowsets; |
| stats.num_segs += i.num_segments(); |
| stats.index_size += i.index_disk_size(); |
| stats.segment_size += i.data_disk_size(); |
| } // for tmp_rowsets_meta |
| } |
| |
| process_mow_when_commit_txn(request, instance_id, code, msg, txn, table_id_tablet_ids); |
| if (code != MetaServiceCode::OK) { |
| LOG(WARNING) << "process mow failed, txn_id=" << txn_id << " code=" << code; |
| return; |
| } |
| |
| // Save rowset meta |
| for (auto& i : rowsets) { |
| size_t rowset_size = i.first.size() + i.second.size(); |
| txn->put(i.first, i.second); |
| LOG(INFO) << "xxx put rowset_key=" << hex(i.first) << " txn_id=" << txn_id |
| << " rowset_size=" << rowset_size; |
| } |
| |
| // Save versions |
| for (auto& i : new_versions) { |
| std::string ver_val; |
| VersionPB version_pb; |
| version_pb.set_version(i.second); |
| if (!version_pb.SerializeToString(&ver_val)) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| ss << "failed to serialize version_pb when saving, txn_id=" << txn_id; |
| msg = ss.str(); |
| return; |
| } |
| |
| txn->put(i.first, ver_val); |
| LOG(INFO) << "xxx put partition_version_key=" << hex(i.first) << " version:" << i.second |
| << " txn_id=" << txn_id; |
| |
| std::string_view ver_key = i.first; |
| ver_key.remove_prefix(1); // Remove key space |
| // PartitionVersionKeyInfo {instance_id, db_id, table_id, partition_id} |
| std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out; |
| int ret = decode_key(&ver_key, &out); |
| if (ret != 0) [[unlikely]] { |
| // decode version key error means this is something wrong, |
| // we can not continue this txn |
| LOG(WARNING) << "failed to decode key, ret=" << ret << " key=" << hex(ver_key); |
| code = MetaServiceCode::UNDEFINED_ERR; |
| msg = "decode version key error"; |
| return; |
| } |
| |
| int64_t table_id = std::get<int64_t>(std::get<0>(out[4])); |
| int64_t partition_id = std::get<int64_t>(std::get<0>(out[5])); |
| VLOG_DEBUG << "txn_id=" << txn_id << " table_id=" << table_id |
| << " partition_id=" << partition_id << " version=" << i.second; |
| |
| response->add_table_ids(table_id); |
| response->add_partition_ids(partition_id); |
| response->add_versions(i.second); |
| } |
| |
| // Save table versions |
| for (auto& i : table_id_tablet_ids) { |
| std::string ver_key = table_version_key({instance_id, db_id, i.first}); |
| txn->atomic_add(ver_key, 1); |
| LOG(INFO) << "xxx atomic add table_version_key=" << hex(ver_key) << " txn_id=" << txn_id; |
| } |
| |
| LOG(INFO) << " before update txn_info=" << txn_info.ShortDebugString(); |
| |
| // Update txn_info |
| txn_info.set_status(TxnStatusPB::TXN_STATUS_VISIBLE); |
| |
| auto now_time = system_clock::now(); |
| uint64_t commit_time = duration_cast<milliseconds>(now_time.time_since_epoch()).count(); |
| if ((txn_info.prepare_time() + txn_info.timeout_ms()) < commit_time) { |
| code = MetaServiceCode::UNDEFINED_ERR; |
| msg = fmt::format("txn is expired, not allow to commit txn_id={}", txn_id); |
| LOG(INFO) << msg << " prepare_time=" << txn_info.prepare_time() |
| << " timeout_ms=" << txn_info.timeout_ms() << " commit_time=" << commit_time; |
| return; |
| } |
| txn_info.set_commit_time(commit_time); |
| txn_info.set_finish_time(commit_time); |
| if (request->has_commit_attachment()) { |
| txn_info.mutable_commit_attachment()->CopyFrom(request->commit_attachment()); |
| } |
| LOG(INFO) << "after update txn_info=" << txn_info.ShortDebugString(); |
| info_val.clear(); |
| if (!txn_info.SerializeToString(&info_val)) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| ss << "failed to serialize txn_info when saving, txn_id=" << txn_id; |
| msg = ss.str(); |
| return; |
| } |
| txn->put(info_key, info_val); |
| LOG(INFO) << "xxx put info_key=" << hex(info_key) << " txn_id=" << txn_id; |
| |
| // Update stats of affected tablet |
| std::deque<std::string> kv_pool; |
| std::function<void(const StatsTabletKeyInfo&, const TabletStats&)> update_tablet_stats; |
| if (config::split_tablet_stats) { |
| update_tablet_stats = [&](const StatsTabletKeyInfo& info, const TabletStats& stats) { |
| if (stats.num_segs > 0) { |
| auto& data_size_key = kv_pool.emplace_back(); |
| stats_tablet_data_size_key(info, &data_size_key); |
| txn->atomic_add(data_size_key, stats.data_size); |
| auto& num_rows_key = kv_pool.emplace_back(); |
| stats_tablet_num_rows_key(info, &num_rows_key); |
| txn->atomic_add(num_rows_key, stats.num_rows); |
| auto& num_segs_key = kv_pool.emplace_back(); |
| stats_tablet_num_segs_key(info, &num_segs_key); |
| txn->atomic_add(num_segs_key, stats.num_segs); |
| auto& index_size_key = kv_pool.emplace_back(); |
| stats_tablet_index_size_key(info, &index_size_key); |
| txn->atomic_add(index_size_key, stats.index_size); |
| auto& segment_size_key = kv_pool.emplace_back(); |
| stats_tablet_segment_size_key(info, &segment_size_key); |
| txn->atomic_add(segment_size_key, stats.segment_size); |
| } |
| auto& num_rowsets_key = kv_pool.emplace_back(); |
| stats_tablet_num_rowsets_key(info, &num_rowsets_key); |
| txn->atomic_add(num_rowsets_key, stats.num_rowsets); |
| }; |
| } else { |
| update_tablet_stats = [&](const StatsTabletKeyInfo& info, const TabletStats& stats) { |
| auto& key = kv_pool.emplace_back(); |
| stats_tablet_key(info, &key); |
| auto& val = kv_pool.emplace_back(); |
| TxnErrorCode err = txn->get(key, &val); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TABLET_NOT_FOUND |
| : cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to get tablet stats, err={} tablet_id={}", err, |
| std::get<4>(info)); |
| return; |
| } |
| TabletStatsPB stats_pb; |
| if (!stats_pb.ParseFromString(val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = fmt::format("malformed tablet stats value, key={}", hex(key)); |
| return; |
| } |
| stats_pb.set_data_size(stats_pb.data_size() + stats.data_size); |
| stats_pb.set_num_rows(stats_pb.num_rows() + stats.num_rows); |
| stats_pb.set_num_rowsets(stats_pb.num_rowsets() + stats.num_rowsets); |
| stats_pb.set_num_segments(stats_pb.num_segments() + stats.num_segs); |
| stats_pb.set_index_size(stats_pb.index_size() + stats.index_size); |
| stats_pb.set_segment_size(stats_pb.segment_size() + stats.segment_size); |
| stats_pb.SerializeToString(&val); |
| txn->put(key, val); |
| LOG(INFO) << "put stats_tablet_key, key=" << hex(key); |
| }; |
| } |
| for (auto& [tablet_id, stats] : tablet_stats) { |
| DCHECK(tablet_ids.count(tablet_id)); |
| auto& tablet_idx = tablet_ids[tablet_id]; |
| StatsTabletKeyInfo info {instance_id, tablet_idx.table_id(), tablet_idx.index_id(), |
| tablet_idx.partition_id(), tablet_id}; |
| update_tablet_stats(info, stats); |
| if (code != MetaServiceCode::OK) return; |
| } |
| // Remove tmp rowset meta |
| for (auto& [_, tmp_rowsets_meta] : sub_txn_to_tmp_rowsets_meta) { |
| for (auto& [k, _] : tmp_rowsets_meta) { |
| txn->remove(k); |
| LOG(INFO) << "xxx remove tmp_rowset_key=" << hex(k) << " txn_id=" << txn_id; |
| } |
| } |
| |
| const std::string running_key = txn_running_key({instance_id, db_id, txn_id}); |
| LOG(INFO) << "xxx remove running_key=" << hex(running_key) << " txn_id=" << txn_id; |
| txn->remove(running_key); |
| |
| std::string recycle_val; |
| std::string recycle_key = recycle_txn_key({instance_id, db_id, txn_id}); |
| RecycleTxnPB recycle_pb; |
| recycle_pb.set_creation_time(commit_time); |
| recycle_pb.set_label(txn_info.label()); |
| |
| if (!recycle_pb.SerializeToString(&recycle_val)) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| ss << "failed to serialize recycle_pb, txn_id=" << txn_id; |
| msg = ss.str(); |
| return; |
| } |
| txn->put(recycle_key, recycle_val); |
| LOG(INFO) << "xxx commit_txn put recycle_txn_key key=" << hex(recycle_key) |
| << " txn_id=" << txn_id; |
| |
| LOG(INFO) << "commit_txn put_size=" << txn->put_bytes() << " del_size=" << txn->delete_bytes() |
| << " num_put_keys=" << txn->num_put_keys() << " num_del_keys=" << txn->num_del_keys() |
| << " txn_size=" << txn->approximate_bytes() << " txn_id=" << txn_id; |
| |
| // Finally we are done... |
| err = txn->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| if (err == TxnErrorCode::TXN_VALUE_TOO_LARGE || err == TxnErrorCode::TXN_BYTES_TOO_LARGE) { |
| size_t max_size = 0, max_num_segments = 0, |
| min_num_segments = std::numeric_limits<size_t>::max(), avg_num_segments = 0; |
| std::pair<std::string, RowsetMetaCloudPB>* max_rowset_meta = nullptr; |
| for (auto& sub_txn : sub_txn_infos) { |
| auto it = sub_txn_to_tmp_rowsets_meta.find(sub_txn.sub_txn_id()); |
| if (it == sub_txn_to_tmp_rowsets_meta.end()) { |
| continue; |
| } |
| for (auto& rowset_meta : it->second) { |
| if (rowset_meta.second.ByteSizeLong() > max_size) { |
| max_size = rowset_meta.second.ByteSizeLong(); |
| max_rowset_meta = &rowset_meta; |
| } |
| if (rowset_meta.second.num_segments() > max_num_segments) { |
| max_num_segments = rowset_meta.second.num_segments(); |
| } |
| if (rowset_meta.second.num_segments() < min_num_segments) { |
| min_num_segments = rowset_meta.second.num_segments(); |
| } |
| avg_num_segments += rowset_meta.second.num_segments(); |
| } |
| if (!it->second.empty()) { |
| avg_num_segments /= it->second.size(); |
| } |
| } |
| if (max_rowset_meta) { |
| LOG(WARNING) << "failed to commit kv txn with sub txn" |
| << ", err=" << err << ", txn_id=" << txn_id |
| << ", total_rowsets=" << rowsets.size() |
| << ", avg_num_segments=" << avg_num_segments |
| << ", min_num_segments=" << min_num_segments |
| << ", max_num_segments=" << max_num_segments |
| << ", largest_rowset_size=" << max_size |
| << ", largest_rowset_key=" << hex(max_rowset_meta->first) |
| << ", largest_rowset_value=" |
| << max_rowset_meta->second.ShortDebugString(); |
| } |
| } |
| code = cast_as<ErrCategory::COMMIT>(err); |
| ss << "failed to commit kv txn with sub txn, txn_id=" << txn_id << " err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| |
| // calculate table stats from tablets stats |
| std::map<int64_t /*table_id*/, TableStats> table_stats; |
| std::vector<int64_t> base_tablet_ids(request->base_tablet_ids().begin(), |
| request->base_tablet_ids().end()); |
| calc_table_stats(tablet_ids, tablet_stats, table_stats, base_tablet_ids); |
| for (const auto& pair : table_stats) { |
| TableStatsPB* stats_pb = response->add_table_stats(); |
| auto table_id = pair.first; |
| auto stats = pair.second; |
| get_pb_from_tablestats(stats, stats_pb); |
| stats_pb->set_table_id(table_id); |
| VLOG_DEBUG << "Add TableStats to CommitTxnResponse. txn_id=" << txn_id |
| << " table_id=" << table_id |
| << " updated_row_count=" << stats_pb->updated_row_count(); |
| } |
| |
| response->mutable_txn_info()->CopyFrom(txn_info); |
| } // end commit_txn_with_sub_txn |
| |
| static bool fuzzy_random() { |
| return std::chrono::steady_clock::now().time_since_epoch().count() & 0x01; |
| } |
| |
| static bool force_txn_lazy_commit() { |
| if (config::enable_cloud_txn_lazy_commit_fuzzy_test) [[unlikely]] { |
| return fuzzy_random(); |
| } |
| return false; |
| } |
| |
| void MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller, |
| const CommitTxnRequest* request, CommitTxnResponse* response, |
| ::google::protobuf::Closure* done) { |
| RPC_PREPROCESS(commit_txn); |
| if (!request->has_txn_id()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "invalid argument, missing txn id"; |
| return; |
| } |
| |
| int64_t txn_id = request->txn_id(); |
| std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : ""; |
| instance_id = get_instance_id(resource_mgr_, cloud_unique_id); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "empty instance_id"; |
| LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id << " txn_id=" << txn_id; |
| return; |
| } |
| RPC_RATE_LIMIT(commit_txn) |
| |
| if (request->has_is_txn_load() && request->is_txn_load()) { |
| commit_txn_with_sub_txn(request, response, txn_kv_, code, msg, instance_id); |
| return; |
| } |
| |
| int64_t db_id; |
| std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>> tmp_rowsets_meta; |
| scan_tmp_rowset(instance_id, txn_id, txn_kv_, code, msg, &db_id, &tmp_rowsets_meta); |
| if (code != MetaServiceCode::OK) { |
| LOG(WARNING) << "scan_tmp_rowset failed, txn_id=" << txn_id << " code=" << code; |
| return; |
| } |
| |
| TxnErrorCode err = TxnErrorCode::TXN_OK; |
| bool enable_txn_lazy_commit_feature = |
| (request->has_is_2pc() && !request->is_2pc() && request->has_enable_txn_lazy_commit() && |
| request->enable_txn_lazy_commit() && config::enable_cloud_txn_lazy_commit); |
| |
| while ((!enable_txn_lazy_commit_feature || |
| (tmp_rowsets_meta.size() <= config::txn_lazy_commit_rowsets_thresold))) { |
| if (force_txn_lazy_commit()) { |
| LOG(INFO) << "fuzzy test force_txn_lazy_commit, txn_id=" << txn_id; |
| break; |
| } |
| |
| commit_txn_immediately(request, response, txn_kv_, txn_lazy_committer_, code, msg, |
| instance_id, db_id, tmp_rowsets_meta, err); |
| |
| if (MetaServiceCode::OK == code) { |
| return; |
| } |
| |
| if (TxnErrorCode::TXN_BYTES_TOO_LARGE != err) { |
| return; |
| } |
| |
| if (!enable_txn_lazy_commit_feature) { |
| if (err == TxnErrorCode::TXN_BYTES_TOO_LARGE) { |
| msg += ", likely due to committing too many tablets. " |
| "Please reduce the number of partitions involved in the load."; |
| } |
| return; |
| } |
| |
| DCHECK(code != MetaServiceCode::OK); |
| DCHECK(enable_txn_lazy_commit_feature); |
| DCHECK(err == TxnErrorCode::TXN_BYTES_TOO_LARGE); |
| LOG(INFO) << "txn_id=" << txn_id << " fallthrough commit_txn_eventually"; |
| break; |
| } |
| |
| LOG(INFO) << "txn_id=" << txn_id << " commit_txn_eventually" |
| << " tmp_rowsets_meta.size=" << tmp_rowsets_meta.size(); |
| code = MetaServiceCode::OK; |
| msg.clear(); |
| commit_txn_eventually(request, response, txn_kv_, txn_lazy_committer_, code, msg, instance_id, |
| db_id, tmp_rowsets_meta); |
| } |
| |
| static void _abort_txn(const std::string& instance_id, const AbortTxnRequest* request, |
| Transaction* txn, TxnInfoPB& return_txn_info, std::stringstream& ss, |
| MetaServiceCode& code, std::string& msg) { |
| int64_t txn_id = request->txn_id(); |
| std::string label = request->label(); |
| int64_t db_id = request->db_id(); |
| |
| std::string info_key; // Will be used when saving updated txn |
| std::string info_val; // Will be reused when saving updated txn |
| TxnErrorCode err; |
| //TODO: split with two function. |
| //there two ways to abort txn: |
| //1. abort txn by txn id |
| //2. abort txn by label and db_id |
| if (txn_id > 0) { |
| VLOG_DEBUG << "abort_txn by txn_id, txn_id=" << txn_id; |
| //abort txn by txn id |
| // Get db id with txn id |
| |
| std::string index_key; |
| std::string index_val; |
| //not provide db_id, we need read from disk. |
| if (db_id == 0) { |
| index_key = txn_index_key({instance_id, txn_id}); |
| err = txn->get(index_key, &index_val); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND |
| : cast_as<ErrCategory::READ>(err); |
| if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| ss << "transaction [" << txn_id << "] not found"; |
| } else { |
| ss << "failed to get txn info, txn_id=" << txn_id << " err=" << err; |
| } |
| msg = ss.str(); |
| return; |
| } |
| |
| TxnIndexPB index_pb; |
| if (!index_pb.ParseFromString(index_val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| ss << "failed to parse txn_index_val" |
| << " txn_id=" << txn_id; |
| msg = ss.str(); |
| return; |
| } |
| DCHECK(index_pb.has_tablet_index() == true); |
| DCHECK(index_pb.tablet_index().has_db_id() == true); |
| db_id = index_pb.tablet_index().db_id(); |
| } |
| |
| // Get txn info with db_id and txn_id |
| info_key = txn_info_key({instance_id, db_id, txn_id}); |
| err = txn->get(info_key, &info_val); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND |
| : cast_as<ErrCategory::READ>(err); |
| ss << "failed to get txn_info, db_id=" << db_id << "txn_id=" << txn_id << "err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| |
| if (!return_txn_info.ParseFromString(info_val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| ss << "failed to parse txn_info db_id=" << db_id << "txn_id=" << txn_id; |
| msg = ss.str(); |
| return; |
| } |
| |
| DCHECK(return_txn_info.txn_id() == txn_id); |
| |
| //check state is valid. |
| if (return_txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) { |
| code = MetaServiceCode::TXN_ALREADY_ABORTED; |
| ss << "transaction [" << txn_id << "] is already aborted, db_id=" << db_id; |
| msg = ss.str(); |
| return; |
| } |
| if (return_txn_info.status() == TxnStatusPB::TXN_STATUS_VISIBLE) { |
| code = MetaServiceCode::TXN_ALREADY_VISIBLE; |
| ss << "transaction [" << txn_id << "] is already VISIBLE, db_id=" << db_id; |
| msg = ss.str(); |
| return; |
| } |
| } else { |
| VLOG_DEBUG << "abort_txn db_id and label, db_id=" << db_id << " label=" << label; |
| //abort txn by label. |
| std::string label_key = txn_label_key({instance_id, db_id, label}); |
| std::string label_val; |
| err = txn->get(label_key, &label_val); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| ss << "txn->get() failed, label=" << label << " err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| |
| //label index not exist |
| if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| code = MetaServiceCode::TXN_LABEL_NOT_FOUND; |
| ss << "label not found, db_id=" << db_id << " label=" << label << " err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| |
| TxnLabelPB label_pb; |
| DCHECK(label_val.size() > VERSION_STAMP_LEN); |
| if (!label_pb.ParseFromArray(label_val.data(), label_val.size() - VERSION_STAMP_LEN)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| ss << "txn_label_pb->ParseFromString() failed, label=" << label; |
| msg = ss.str(); |
| return; |
| } |
| |
| int64_t prepare_txn_id = 0; |
| //found prepare state txn for abort |
| for (auto& cur_txn_id : label_pb.txn_ids()) { |
| std::string cur_info_key = txn_info_key({instance_id, db_id, cur_txn_id}); |
| std::string cur_info_val; |
| err = txn->get(cur_info_key, &cur_info_val); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| std::stringstream ss; |
| ss << "txn->get() failed, cur_txn_id=" << cur_txn_id << " err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| // ret == 0 |
| TxnInfoPB cur_txn_info; |
| if (!cur_txn_info.ParseFromString(cur_info_val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| std::stringstream ss; |
| ss << "cur_txn_info->ParseFromString() failed, cur_txn_id=" << cur_txn_id; |
| msg = ss.str(); |
| return; |
| } |
| VLOG_DEBUG << "cur_txn_info=" << cur_txn_info.ShortDebugString(); |
| //TODO: 2pc else need to check TxnStatusPB::TXN_STATUS_PRECOMMITTED |
| if ((cur_txn_info.status() == TxnStatusPB::TXN_STATUS_PREPARED) || |
| (cur_txn_info.status() == TxnStatusPB::TXN_STATUS_PRECOMMITTED)) { |
| prepare_txn_id = cur_txn_id; |
| return_txn_info = std::move(cur_txn_info); |
| info_key = std::move(cur_info_key); |
| DCHECK_EQ(prepare_txn_id, return_txn_info.txn_id()) |
| << "prepare_txn_id=" << prepare_txn_id |
| << " txn_id=" << return_txn_info.txn_id(); |
| break; |
| } |
| } |
| |
| if (prepare_txn_id == 0) { |
| code = MetaServiceCode::TXN_INVALID_STATUS; |
| std::stringstream ss; |
| ss << "running transaction not found, db_id=" << db_id << " label=" << label; |
| msg = ss.str(); |
| return; |
| } |
| } |
| |
| auto now_time = system_clock::now(); |
| uint64_t finish_time = duration_cast<milliseconds>(now_time.time_since_epoch()).count(); |
| |
| // Update txn_info |
| return_txn_info.set_status(TxnStatusPB::TXN_STATUS_ABORTED); |
| return_txn_info.set_finish_time(finish_time); |
| request->has_reason() ? return_txn_info.set_reason(request->reason()) |
| : return_txn_info.set_reason("User Abort"); |
| |
| if (request->has_commit_attachment()) { |
| return_txn_info.mutable_commit_attachment()->CopyFrom(request->commit_attachment()); |
| } |
| |
| info_val.clear(); |
| if (!return_txn_info.SerializeToString(&info_val)) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| ss << "failed to serialize txn_info when saving, txn_id=" << return_txn_info.txn_id(); |
| msg = ss.str(); |
| return; |
| } |
| LOG(INFO) << "check watermark conflict, txn_info=" << return_txn_info.ShortDebugString(); |
| txn->put(info_key, info_val); |
| LOG(INFO) << "xxx put info_key=" << hex(info_key) << " txn_id=" << return_txn_info.txn_id(); |
| |
| std::string running_key = txn_running_key({instance_id, db_id, return_txn_info.txn_id()}); |
| txn->remove(running_key); |
| LOG(INFO) << "xxx remove running_key=" << hex(running_key) |
| << " txn_id=" << return_txn_info.txn_id(); |
| |
| std::string recycle_key = recycle_txn_key({instance_id, db_id, return_txn_info.txn_id()}); |
| std::string recycle_val; |
| RecycleTxnPB recycle_pb; |
| recycle_pb.set_creation_time(finish_time); |
| recycle_pb.set_label(return_txn_info.label()); |
| |
| if (!recycle_pb.SerializeToString(&recycle_val)) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| ss << "failed to serialize recycle_pb, txn_id=" << return_txn_info.txn_id(); |
| msg = ss.str(); |
| return; |
| } |
| txn->put(recycle_key, recycle_val); |
| LOG(INFO) << "put recycle_txn_key=" << hex(recycle_key) |
| << " txn_id=" << return_txn_info.txn_id(); |
| } |
| |
| void MetaServiceImpl::abort_txn(::google::protobuf::RpcController* controller, |
| const AbortTxnRequest* request, AbortTxnResponse* response, |
| ::google::protobuf::Closure* done) { |
| RPC_PREPROCESS(abort_txn); |
| // Get txn id |
| int64_t txn_id = request->has_txn_id() ? request->txn_id() : -1; |
| std::string label = request->has_label() ? request->label() : ""; |
| int64_t db_id = request->has_db_id() ? request->db_id() : -1; |
| if (txn_id < 0 && (label.empty() || db_id < 0)) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| ss << "invalid txn id and label, db_id=" << db_id << " txn_id=" << txn_id |
| << " label=" << label; |
| msg = ss.str(); |
| return; |
| } |
| |
| std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : ""; |
| instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id()); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| ss << "cannot find instance_id with cloud_unique_id=" |
| << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id) << " label=" << label |
| << " txn_id=" << txn_id; |
| msg = ss.str(); |
| return; |
| } |
| |
| RPC_RATE_LIMIT(abort_txn); |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| ss << "filed to txn_kv_->create_txn(), txn_id=" << txn_id << " label=" << label |
| << " err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| TxnInfoPB txn_info; |
| |
| _abort_txn(instance_id, request, txn.get(), txn_info, ss, code, msg); |
| err = txn->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::COMMIT>(err); |
| ss << "failed to commit kv txn, txn_id=" << txn_info.txn_id() << " err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| response->mutable_txn_info()->CopyFrom(txn_info); |
| } |
| |
| void MetaServiceImpl::get_txn(::google::protobuf::RpcController* controller, |
| const GetTxnRequest* request, GetTxnResponse* response, |
| ::google::protobuf::Closure* done) { |
| RPC_PREPROCESS(get_txn); |
| int64_t txn_id = request->has_txn_id() ? request->txn_id() : -1; |
| int64_t db_id = request->has_db_id() ? request->db_id() : -1; |
| std::string label = request->has_label() ? request->label() : ""; |
| if (txn_id < 0 && label.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| ss << "invalid txn_id, it may be not given or set properly, txn_id=" << txn_id; |
| msg = ss.str(); |
| return; |
| } |
| |
| std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : ""; |
| instance_id = get_instance_id(resource_mgr_, cloud_unique_id); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| ss << "cannot find instance_id with cloud_unique_id=" |
| << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id); |
| msg = ss.str(); |
| return; |
| } |
| |
| RPC_RATE_LIMIT(get_txn) |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| ss << "failed to create txn, txn_id=" << txn_id << " err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| |
| if (!label.empty()) { |
| //step 1: get label |
| const std::string label_key = txn_label_key({instance_id, db_id, label}); |
| std::string label_val; |
| err = txn->get(label_key, &label_val); |
| if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| code = cast_as<ErrCategory::READ>(err); |
| ss << "txn->get failed(), err=" << err << " label=" << label; |
| msg = ss.str(); |
| return; |
| } |
| LOG(INFO) << "txn->get label_key=" << hex(label_key) << " label=" << label |
| << " err=" << err; |
| // step 2: get txn info from label pb |
| TxnLabelPB label_pb; |
| if (err == TxnErrorCode::TXN_OK) { |
| if (label_val.size() <= VERSION_STAMP_LEN || |
| !label_pb.ParseFromArray(label_val.data(), label_val.size() - VERSION_STAMP_LEN)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| ss << "label_pb->ParseFromString() failed, txn_id=" << txn_id << " label=" << label; |
| msg = ss.str(); |
| return; |
| } |
| for (auto& cur_txn_id : label_pb.txn_ids()) { |
| if (cur_txn_id > txn_id) { |
| txn_id = cur_txn_id; |
| } |
| } |
| } else { |
| code = MetaServiceCode::TXN_ID_NOT_FOUND; |
| ss << "Label [" << label << "] has not found"; |
| msg = ss.str(); |
| return; |
| } |
| } |
| |
| //not provide db_id, we need read from disk. |
| if (db_id < 0) { |
| const std::string index_key = txn_index_key({instance_id, txn_id}); |
| std::string index_val; |
| err = txn->get(index_key, &index_val); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND |
| : cast_as<ErrCategory::READ>(err); |
| ss << "failed to get db id with txn_id=" << txn_id << " err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| |
| TxnIndexPB index_pb; |
| if (!index_pb.ParseFromString(index_val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| ss << "failed to parse txn_inf" |
| << " txn_id=" << txn_id; |
| msg = ss.str(); |
| return; |
| } |
| DCHECK(index_pb.has_tablet_index() == true); |
| DCHECK(index_pb.tablet_index().has_db_id() == true); |
| db_id = index_pb.tablet_index().db_id(); |
| if (db_id <= 0) { |
| ss << "internal error: unexpected db_id " << db_id; |
| code = MetaServiceCode::UNDEFINED_ERR; |
| msg = ss.str(); |
| return; |
| } |
| } |
| |
| // Get txn info with db_id and txn_id |
| const std::string info_key = txn_info_key({instance_id, db_id, txn_id}); |
| std::string info_val; |
| err = txn->get(info_key, &info_val); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND |
| : cast_as<ErrCategory::READ>(err); |
| ss << "failed to get db id with db_id=" << db_id << " txn_id=" << txn_id << " err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| |
| TxnInfoPB txn_info; |
| if (!txn_info.ParseFromString(info_val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| ss << "failed to parse txn_info db_id=" << db_id << " txn_id=" << txn_id; |
| msg = ss.str(); |
| return; |
| } |
| |
| VLOG_DEBUG << "txn_info=" << txn_info.ShortDebugString(); |
| DCHECK(txn_info.txn_id() == txn_id); |
| response->mutable_txn_info()->CopyFrom(txn_info); |
| } |
| |
| //To get current max txn id for schema change watermark etc. |
| void MetaServiceImpl::get_current_max_txn_id(::google::protobuf::RpcController* controller, |
| const GetCurrentMaxTxnRequest* request, |
| GetCurrentMaxTxnResponse* response, |
| ::google::protobuf::Closure* done) { |
| RPC_PREPROCESS(get_current_max_txn_id); |
| // TODO: For auth |
| instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id()); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "empty instance_id"; |
| LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id(); |
| return; |
| } |
| RPC_RATE_LIMIT(get_current_max_txn_id) |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| msg = "failed to create txn"; |
| code = cast_as<ErrCategory::CREATE>(err); |
| return; |
| } |
| |
| const std::string key = "schema change"; |
| std::string val; |
| err = txn->get(key, &val); |
| if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| code = cast_as<ErrCategory::READ>(err); |
| std::stringstream ss; |
| ss << "txn->get() failed, err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| int64_t read_version = 0; |
| err = txn->get_read_version(&read_version); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| std::stringstream ss; |
| ss << "get read version failed, ret=" << err; |
| msg = ss.str(); |
| return; |
| } |
| |
| int64_t current_max_txn_id = read_version << 10; |
| VLOG_DEBUG << "read_version=" << read_version << " current_max_txn_id=" << current_max_txn_id; |
| response->set_current_max_txn_id(current_max_txn_id); |
| } |
| |
| /** |
| * 1. Generate a sub_txn_id |
| * |
| * The following steps are done in a txn: |
| * 2. Put txn_index_key in sub_txn_id |
| * 3. Delete txn_label_key in sub_txn_id |
| * 4. Modify the txn state of the txn_id: |
| * - Add the sub txn id to sub_txn_ids: recycler use it to recycle the txn_index_key |
| * - Add the table id to table_ids |
| */ |
| void MetaServiceImpl::begin_sub_txn(::google::protobuf::RpcController* controller, |
| const BeginSubTxnRequest* request, |
| BeginSubTxnResponse* response, |
| ::google::protobuf::Closure* done) { |
| RPC_PREPROCESS(begin_sub_txn); |
| int64_t txn_id = request->has_txn_id() ? request->txn_id() : -1; |
| int64_t sub_txn_num = request->has_sub_txn_num() ? request->sub_txn_num() : -1; |
| int64_t db_id = request->has_db_id() ? request->db_id() : -1; |
| auto& table_ids = request->table_ids(); |
| std::string label = request->has_label() ? request->label() : ""; |
| if (txn_id < 0 || sub_txn_num < 0 || db_id < 0 || table_ids.empty() || label.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| ss << "invalid argument, txn_id=" << txn_id << ", sub_txn_num=" << sub_txn_num |
| << " db_id=" << db_id << ", label=" << label << ", table_ids=["; |
| for (auto table_id : table_ids) { |
| ss << table_id << ", "; |
| } |
| ss << "]"; |
| msg = ss.str(); |
| return; |
| } |
| |
| std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : ""; |
| instance_id = get_instance_id(resource_mgr_, cloud_unique_id); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| ss << "cannot find instance_id with cloud_unique_id=" |
| << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id); |
| msg = ss.str(); |
| return; |
| } |
| |
| RPC_RATE_LIMIT(begin_sub_txn) |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| ss << "txn_kv_->create_txn() failed, err=" << err << " txn_id=" << txn_id |
| << " db_id=" << db_id; |
| msg = ss.str(); |
| return; |
| } |
| |
| const std::string label_key = txn_label_key({instance_id, db_id, label}); |
| std::string label_val; |
| err = txn->get(label_key, &label_val); |
| if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| code = cast_as<ErrCategory::READ>(err); |
| ss << "txn->get failed(), err=" << err << " label=" << label; |
| msg = ss.str(); |
| return; |
| } |
| |
| LOG(INFO) << "txn->get label_key=" << hex(label_key) << " label=" << label << " err=" << err; |
| |
| // err == OK means this is a retry rpc? |
| if (err == TxnErrorCode::TXN_OK) { |
| label_val = label_val.substr(0, label_val.size() - VERSION_STAMP_LEN); |
| } |
| |
| // ret > 0, means label not exist previously. |
| txn->atomic_set_ver_value(label_key, label_val); |
| LOG(INFO) << "txn->atomic_set_ver_value label_key=" << hex(label_key); |
| |
| err = txn->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::COMMIT>(err); |
| ss << "txn->commit failed(), label=" << label << " err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| |
| // 2. Get sub txn id from version stamp |
| txn.reset(); |
| err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| ss << "failed to create txn when get txn id, label=" << label << " err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| |
| label_val.clear(); |
| err = txn->get(label_key, &label_val); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| ss << "txn->get() failed, label=" << label << " err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| |
| LOG(INFO) << "txn->get label_key=" << hex(label_key) << " label=" << label << " err=" << err; |
| |
| // Generated by TxnKv system |
| int64_t sub_txn_id = 0; |
| int ret = |
| get_txn_id_from_fdb_ts(std::string_view(label_val).substr( |
| label_val.size() - VERSION_STAMP_LEN, label_val.size()), |
| &sub_txn_id); |
| if (ret != 0) { |
| code = MetaServiceCode::TXN_GEN_ID_ERR; |
| ss << "get_txn_id_from_fdb_ts() failed, label=" << label << " ret=" << ret; |
| msg = ss.str(); |
| return; |
| } |
| |
| LOG(INFO) << "get_txn_id_from_fdb_ts() label=" << label << " sub_txn_id=" << sub_txn_id |
| << " txn_id=" << txn_id << " label_val.size()=" << label_val.size(); |
| |
| // write txn_index_key |
| const std::string index_key = txn_index_key({instance_id, sub_txn_id}); |
| std::string index_val; |
| TxnIndexPB index_pb; |
| index_pb.mutable_tablet_index()->set_db_id(db_id); |
| if (!index_pb.SerializeToString(&index_val)) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| ss << "failed to serialize txn_index_pb " |
| << "label=" << label << " txn_id=" << txn_id; |
| msg = ss.str(); |
| return; |
| } |
| |
| // Get and update txn info with db_id and txn_id |
| std::string info_val; // Will be reused when saving updated txn |
| const std::string info_key = txn_info_key({instance_id, db_id, txn_id}); |
| err = txn->get(info_key, &info_val); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND |
| : cast_as<ErrCategory::READ>(err); |
| ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << txn_id << " err=" << err; |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| |
| TxnInfoPB txn_info; |
| if (!txn_info.ParseFromString(info_val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" << txn_id; |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| DCHECK(txn_info.txn_id() == txn_id); |
| if (txn_info.status() != TxnStatusPB::TXN_STATUS_PREPARED) { |
| code = MetaServiceCode::TXN_INVALID_STATUS; |
| ss << "transaction status is " << txn_info.status() << " : db_id=" << db_id |
| << " txn_id=" << txn_id; |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| |
| if (txn_info.sub_txn_ids().size() != sub_txn_num) { |
| code = MetaServiceCode::UNDEFINED_ERR; |
| ss << "sub_txn_num mismatch, txn_id=" << txn_id << ", expected sub_txn_num=" << sub_txn_num |
| << ", txn_info.sub_txn_ids=["; |
| for (auto id : txn_info.sub_txn_ids()) { |
| ss << id << ", "; |
| } |
| ss << "]"; |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| } |
| txn_info.mutable_sub_txn_ids()->Add(sub_txn_id); |
| txn_info.mutable_table_ids()->Clear(); |
| for (auto table_id : table_ids) { |
| txn_info.mutable_table_ids()->Add(table_id); |
| } |
| if (!txn_info.SerializeToString(&info_val)) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| ss << "failed to serialize txn_info when saving, txn_id=" << txn_id; |
| msg = ss.str(); |
| return; |
| } |
| |
| txn->remove(label_key); |
| txn->put(info_key, info_val); |
| txn->put(index_key, index_val); |
| LOG(INFO) << "txn_id=" << txn_id << ", sub_txn_id=" << sub_txn_id |
| << ", remove label_key=" << hex(label_key) << ", put info_key=" << hex(info_key) |
| << ", put index_key=" << hex(index_key); |
| err = txn->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::COMMIT>(err); |
| ss << "failed to commit kv txn, txn_id=" << txn_id << " err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| response->set_sub_txn_id(sub_txn_id); |
| response->mutable_txn_info()->CopyFrom(txn_info); |
| } |
| |
| /** |
| * 1. Modify the txn state of the txn_id: |
| * - Remove the table id from table_ids |
| */ |
| void MetaServiceImpl::abort_sub_txn(::google::protobuf::RpcController* controller, |
| const AbortSubTxnRequest* request, |
| AbortSubTxnResponse* response, |
| ::google::protobuf::Closure* done) { |
| RPC_PREPROCESS(abort_sub_txn); |
| int64_t txn_id = request->has_txn_id() ? request->txn_id() : -1; |
| int64_t sub_txn_id = request->has_sub_txn_id() ? request->sub_txn_id() : -1; |
| int64_t sub_txn_num = request->has_sub_txn_num() ? request->sub_txn_num() : -1; |
| int64_t db_id = request->has_db_id() ? request->db_id() : -1; |
| auto& table_ids = request->table_ids(); |
| if (txn_id < 0 || sub_txn_id < 0 || sub_txn_num < 0 || db_id < 0) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| ss << "invalid argument, txn_id=" << txn_id << ", sub_txn_id=" << sub_txn_id |
| << ", sub_txn_num=" << sub_txn_num << " db_id=" << db_id << ", table_ids=["; |
| for (auto table_id : table_ids) { |
| ss << table_id << ", "; |
| } |
| ss << "]"; |
| msg = ss.str(); |
| return; |
| } |
| |
| std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : ""; |
| instance_id = get_instance_id(resource_mgr_, cloud_unique_id); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| ss << "cannot find instance_id with cloud_unique_id=" |
| << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id); |
| msg = ss.str(); |
| return; |
| } |
| |
| RPC_RATE_LIMIT(abort_sub_txn) |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| ss << "txn_kv_->create_txn() failed, err=" << err << " txn_id=" << txn_id |
| << " sub_txn_id=" << sub_txn_id << " db_id=" << db_id; |
| msg = ss.str(); |
| return; |
| } |
| |
| // Get and update txn info with db_id and txn_id |
| std::string info_val; // Will be reused when saving updated txn |
| const std::string info_key = txn_info_key({instance_id, db_id, txn_id}); |
| err = txn->get(info_key, &info_val); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND |
| : cast_as<ErrCategory::READ>(err); |
| ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << txn_id |
| << " sub_txn_id=" << sub_txn_id << " err=" << err; |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| TxnInfoPB txn_info; |
| if (!txn_info.ParseFromString(info_val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" << txn_id |
| << " sub_txn_id=" << sub_txn_id; |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| DCHECK(txn_info.txn_id() == txn_id); |
| if (txn_info.status() != TxnStatusPB::TXN_STATUS_PREPARED) { |
| code = MetaServiceCode::TXN_INVALID_STATUS; |
| ss << "transaction status is " << txn_info.status() << " : db_id=" << db_id |
| << " txn_id=" << txn_id << " sub_txn_id=" << sub_txn_id; |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| |
| // remove table_id and does not need to remove sub_txn_id |
| if (txn_info.sub_txn_ids().size() != sub_txn_num) { |
| code = MetaServiceCode::UNDEFINED_ERR; |
| ss << "sub_txn_num mismatch, txn_id=" << txn_id << ", sub_txn_id=" << sub_txn_id |
| << ", expected sub_txn_num=" << sub_txn_num << ", txn_info.sub_txn_ids=["; |
| for (auto id : txn_info.sub_txn_ids()) { |
| ss << id << ", "; |
| } |
| ss << "]"; |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| } |
| txn_info.mutable_table_ids()->Clear(); |
| for (auto table_id : table_ids) { |
| txn_info.mutable_table_ids()->Add(table_id); |
| } |
| // TODO should we try to delete txn_label_key if begin_sub_txn failed to delete? |
| |
| if (!txn_info.SerializeToString(&info_val)) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| ss << "failed to serialize txn_info when saving, txn_id=" << txn_id |
| << " sub_txn_id=" << sub_txn_id; |
| msg = ss.str(); |
| return; |
| } |
| |
| txn->put(info_key, info_val); |
| LOG(INFO) << "txn_id=" << txn_id << ", sub_txn_id=" << sub_txn_id |
| << ", put info_key=" << hex(info_key); |
| err = txn->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::COMMIT>(err); |
| ss << "failed to commit kv txn, txn_id=" << txn_id << ", sub_txn_id=" << sub_txn_id |
| << ", err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| response->mutable_txn_info()->CopyFrom(txn_info); |
| } |
| |
| void MetaServiceImpl::abort_txn_with_coordinator(::google::protobuf::RpcController* controller, |
| const AbortTxnWithCoordinatorRequest* request, |
| AbortTxnWithCoordinatorResponse* response, |
| ::google::protobuf::Closure* done) { |
| RPC_PREPROCESS(abort_txn_with_coordinator); |
| if (!request->has_id() || !request->has_ip() || !request->has_start_time()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "invalid coordinate id, coordinate ip or coordinate start time."; |
| return; |
| } |
| // TODO: For auth |
| std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : ""; |
| instance_id = get_instance_id(resource_mgr_, cloud_unique_id); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| ss << "cannot find instance_id with cloud_unique_id=" |
| << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id); |
| msg = ss.str(); |
| return; |
| } |
| RPC_RATE_LIMIT(abort_txn_with_coordinator); |
| std::string begin_info_key = txn_info_key({instance_id, 0, 0}); |
| std::string end_info_key = txn_info_key({instance_id, INT64_MAX, INT64_MAX}); |
| LOG(INFO) << "begin_info_key:" << hex(begin_info_key) << " end_info_key:" << hex(end_info_key); |
| |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| msg = "failed to create txn"; |
| code = cast_as<ErrCategory::CREATE>(err); |
| return; |
| } |
| std::unique_ptr<RangeGetIterator> it; |
| int64_t abort_txn_cnt = 0; |
| int64_t total_iteration_cnt = 0; |
| bool need_commit = false; |
| do { |
| err = txn->get(begin_info_key, end_info_key, &it, true); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| ss << "failed to get txn info. err=" << err; |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| |
| while (it->has_next()) { |
| total_iteration_cnt++; |
| auto [k, v] = it->next(); |
| VLOG_DEBUG << "check txn info txn_info_key=" << hex(k); |
| TxnInfoPB info_pb; |
| if (!info_pb.ParseFromArray(v.data(), v.size())) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| ss << "malformed txn running info"; |
| msg = ss.str(); |
| ss << " key=" << hex(k); |
| LOG(WARNING) << ss.str(); |
| return; |
| } |
| const auto& coordinate = info_pb.coordinator(); |
| if (info_pb.status() == TxnStatusPB::TXN_STATUS_PREPARED && |
| coordinate.sourcetype() == TXN_SOURCE_TYPE_BE && coordinate.id() == request->id() && |
| coordinate.ip() == request->ip() && |
| coordinate.start_time() < request->start_time()) { |
| need_commit = true; |
| TxnInfoPB return_txn_info; |
| AbortTxnRequest request; |
| request.set_db_id(info_pb.db_id()); |
| request.set_txn_id(info_pb.txn_id()); |
| request.set_label(info_pb.label()); |
| request.set_reason("Abort because coordinate be restart/stop"); |
| _abort_txn(instance_id, &request, txn.get(), return_txn_info, ss, code, msg); |
| } |
| if (!it->has_next()) { |
| begin_info_key = k; |
| } |
| } |
| begin_info_key.push_back('\x00'); // Update to next smallest key for iteration |
| } while (it->more()); |
| LOG(INFO) << "abort txn count: " << abort_txn_cnt |
| << " total iteration count: " << total_iteration_cnt; |
| if (need_commit) { |
| err = txn->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::COMMIT>(err); |
| ss << "failed to abort txn kv, cooridnate_id=" << request->id() |
| << " coordinate_ip=" << request->ip() |
| << "coordinate_start_time=" << request->start_time() << " err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| } |
| } |
| |
| std::string get_txn_info_key_from_txn_running_key(std::string_view txn_running_key) { |
| std::string conflict_txn_info_key; |
| std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out; |
| txn_running_key.remove_prefix(1); |
| int ret = decode_key(&txn_running_key, &out); |
| if (ret != 0) [[unlikely]] { |
| // decode version key error means this is something wrong, |
| // we can not continue this txn |
| LOG(WARNING) << "failed to decode key, ret=" << ret << " key=" << hex(txn_running_key); |
| } else { |
| DCHECK(out.size() == 5) << " key=" << hex(txn_running_key) << " " << out.size(); |
| const std::string& decode_instance_id = std::get<1>(std::get<0>(out[1])); |
| int64_t db_id = std::get<0>(std::get<0>(out[3])); |
| int64_t txn_id = std::get<0>(std::get<0>(out[4])); |
| conflict_txn_info_key = txn_info_key({decode_instance_id, db_id, txn_id}); |
| } |
| return conflict_txn_info_key; |
| } |
| |
| void MetaServiceImpl::check_txn_conflict(::google::protobuf::RpcController* controller, |
| const CheckTxnConflictRequest* request, |
| CheckTxnConflictResponse* response, |
| ::google::protobuf::Closure* done) { |
| RPC_PREPROCESS(check_txn_conflict); |
| if (!request->has_db_id() || !request->has_end_txn_id() || (request->table_ids_size() <= 0)) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "invalid db id, end txn id or table_ids."; |
| return; |
| } |
| // TODO: For auth |
| std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : ""; |
| instance_id = get_instance_id(resource_mgr_, cloud_unique_id); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| ss << "cannot find instance_id with cloud_unique_id=" |
| << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id); |
| msg = ss.str(); |
| return; |
| } |
| RPC_RATE_LIMIT(check_txn_conflict) |
| int64_t db_id = request->db_id(); |
| |
| std::string begin_running_key = txn_running_key({instance_id, db_id, 0}); |
| std::string end_running_key = txn_running_key({instance_id, db_id, request->end_txn_id()}); |
| LOG(INFO) << "begin_running_key:" << hex(begin_running_key) |
| << " end_running_key:" << hex(end_running_key); |
| |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| msg = "failed to create txn"; |
| code = cast_as<ErrCategory::CREATE>(err); |
| return; |
| } |
| |
| //TODO: use set to replace |
| std::vector<int64_t> src_table_ids(request->table_ids().begin(), request->table_ids().end()); |
| std::sort(src_table_ids.begin(), src_table_ids.end()); |
| std::unique_ptr<RangeGetIterator> it; |
| int64_t skip_timeout_txn_cnt = 0; |
| int total_iteration_cnt = 0; |
| bool finished = true; |
| do { |
| err = txn->get(begin_running_key, end_running_key, &it, true); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| ss << "failed to get txn running info. err=" << err; |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| |
| VLOG_DEBUG << "begin_running_key=" << hex(begin_running_key) |
| << " end_running_key=" << hex(end_running_key) |
| << " it->has_next()=" << it->has_next(); |
| |
| auto now_time = system_clock::now(); |
| uint64_t check_time = duration_cast<milliseconds>(now_time.time_since_epoch()).count(); |
| while (it->has_next()) { |
| total_iteration_cnt++; |
| auto [k, v] = it->next(); |
| LOG(INFO) << "check watermark conflict range_get txn_run_key=" << hex(k); |
| TxnRunningPB running_pb; |
| if (!running_pb.ParseFromArray(v.data(), v.size())) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| ss << "malformed txn running info"; |
| msg = ss.str(); |
| ss << " key=" << hex(k); |
| LOG(WARNING) << ss.str(); |
| return; |
| } |
| |
| if (running_pb.timeout_time() < check_time) { |
| skip_timeout_txn_cnt++; |
| } else { |
| LOG(INFO) << "check watermark conflict range_get txn_run_key=" << hex(k) << " " << k |
| << " running_pb=" << running_pb.ShortDebugString(); |
| std::vector<int64_t> running_table_ids(running_pb.table_ids().begin(), |
| running_pb.table_ids().end()); |
| std::sort(running_table_ids.begin(), running_table_ids.end()); |
| std::vector<int64_t> result( |
| std::min(running_table_ids.size(), src_table_ids.size())); |
| auto iter = std::set_intersection(src_table_ids.begin(), src_table_ids.end(), |
| running_table_ids.begin(), |
| running_table_ids.end(), result.begin()); |
| result.resize(iter - result.begin()); |
| if (!result.empty()) { |
| finished = false; |
| std::string conflict_txn_info_key = get_txn_info_key_from_txn_running_key(k); |
| if (!conflict_txn_info_key.empty()) { |
| std::string conflict_txn_info_val; |
| err = txn->get(conflict_txn_info_key, &conflict_txn_info_val); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = err == TxnErrorCode::TXN_KEY_NOT_FOUND |
| ? MetaServiceCode::TXN_ID_NOT_FOUND |
| : cast_as<ErrCategory::READ>(err); |
| ss << "failed to get txn_info, conflict_txn_info_key=" |
| << hex(conflict_txn_info_key); |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| TxnInfoPB& conflict_txn_info = *response->add_conflict_txns(); |
| if (!conflict_txn_info.ParseFromString(conflict_txn_info_val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| ss << "failed to parse txn_info, conflict_txn_info_key=" |
| << hex(conflict_txn_info_key); |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| } |
| } |
| } |
| |
| if (!it->has_next()) { |
| begin_running_key = k; |
| } |
| } |
| begin_running_key.push_back('\x00'); // Update to next smallest key for iteration |
| } while (it->more()); |
| LOG(INFO) << "skip timeout txn count: " << skip_timeout_txn_cnt |
| << " conflict txn count: " << response->conflict_txns_size() |
| << " total iteration count: " << total_iteration_cnt; |
| response->set_finished(finished); |
| } |
| |
| /** |
| * @brief |
| * |
| * @param txn_kv |
| * @param instance_id |
| * @param db_id |
| * @param label_key |
| * @return TxnErrorCode |
| */ |
| TxnErrorCode internal_clean_label(std::shared_ptr<TxnKv> txn_kv, const std::string_view instance_id, |
| int64_t db_id, const std::string_view label_key) { |
| std::string label_val; |
| TxnLabelPB label_pb; |
| |
| int64_t key_size = 0; |
| int64_t val_size = 0; |
| std::vector<int64_t> survival_txn_ids; |
| std::vector<int64_t> clean_txn_ids; |
| |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| LOG(WARNING) << "failed to create txn. err=" << err << " db_id=" << db_id |
| << " label_key=" << hex(label_key); |
| return err; |
| } |
| |
| err = txn->get(label_key, &label_val); |
| if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| LOG(WARNING) << "failed to txn get err=" << err << " db_id=" << db_id |
| << " label_key=" << hex(label_key); |
| return err; |
| } |
| if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| LOG(INFO) << "txn get err=" << err << " db_id=" << db_id << " label_key=" << hex(label_key); |
| return TxnErrorCode::TXN_OK; |
| } |
| |
| if (label_val.size() <= VERSION_STAMP_LEN) { |
| LOG(INFO) << "label_val.size()=" << label_val.size() << " db_id=" << db_id |
| << " label_key=" << hex(label_key); |
| return TxnErrorCode::TXN_OK; |
| } |
| |
| if (!label_pb.ParseFromArray(label_val.data(), label_val.size() - VERSION_STAMP_LEN)) { |
| LOG(WARNING) << "failed to parse txn label" |
| << " db_id=" << db_id << " label_key=" << hex(label_key) |
| << " label_val.size()=" << label_val.size(); |
| return TxnErrorCode::TXN_UNIDENTIFIED_ERROR; |
| } |
| |
| for (auto txn_id : label_pb.txn_ids()) { |
| const std::string recycle_key = recycle_txn_key({instance_id, db_id, txn_id}); |
| const std::string index_key = txn_index_key({instance_id, txn_id}); |
| const std::string info_key = txn_info_key({instance_id, db_id, txn_id}); |
| |
| std::string info_val; |
| err = txn->get(info_key, &info_val); |
| if (err != TxnErrorCode::TXN_OK) { |
| LOG_WARNING("info_key get failed") |
| .tag("info_key", hex(info_key)) |
| .tag("label_key", hex(label_key)) |
| .tag("db_id", db_id) |
| .tag("txn_id", txn_id) |
| .tag("err", err); |
| return err; |
| } |
| |
| TxnInfoPB txn_info; |
| if (!txn_info.ParseFromString(info_val)) { |
| LOG_WARNING("info_val parse failed") |
| .tag("info_key", hex(info_key)) |
| .tag("label_key", hex(label_key)) |
| .tag("db_id", db_id) |
| .tag("txn_id", txn_id) |
| .tag("size", info_val.size()); |
| return TxnErrorCode::TXN_UNIDENTIFIED_ERROR; |
| } |
| |
| std::string recycle_val; |
| if ((txn_info.status() != TxnStatusPB::TXN_STATUS_ABORTED) && |
| (txn_info.status() != TxnStatusPB::TXN_STATUS_VISIBLE)) { |
| // txn status is not final status |
| LOG(INFO) << "txn not final state, label_key=" << hex(label_key) |
| << " txn_id=" << txn_id; |
| survival_txn_ids.push_back(txn_id); |
| DCHECK_EQ(txn->get(recycle_key, &recycle_val), TxnErrorCode::TXN_KEY_NOT_FOUND); |
| continue; |
| } |
| |
| DCHECK_EQ(txn->get(recycle_key, &recycle_val), TxnErrorCode::TXN_OK); |
| DCHECK((txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) || |
| (txn_info.status() == TxnStatusPB::TXN_STATUS_VISIBLE)); |
| |
| txn->remove(index_key); |
| key_size += index_key.size(); |
| |
| txn->remove(info_key); |
| key_size += info_key.size(); |
| |
| txn->remove(recycle_key); |
| key_size += recycle_key.size(); |
| clean_txn_ids.push_back(txn_id); |
| LOG(INFO) << "remove index_key=" << hex(index_key) << " info_key=" << hex(info_key) |
| << " recycle_key=" << hex(recycle_key); |
| } |
| if (label_pb.txn_ids().size() == clean_txn_ids.size()) { |
| txn->remove(label_key); |
| key_size += label_key.size(); |
| LOG(INFO) << "remove label_key=" << hex(label_key); |
| } else { |
| label_pb.clear_txn_ids(); |
| for (auto txn_id : survival_txn_ids) { |
| label_pb.add_txn_ids(txn_id); |
| } |
| LOG(INFO) << "rewrite label_pb=" << label_pb.ShortDebugString(); |
| label_val.clear(); |
| if (!label_pb.SerializeToString(&label_val)) { |
| LOG(INFO) << "failed to serialize label_pb=" << label_pb.ShortDebugString() |
| << " label_key=" << hex(label_key); |
| return TxnErrorCode::TXN_UNIDENTIFIED_ERROR; |
| } |
| txn->atomic_set_ver_value(label_key, label_val); |
| key_size += label_key.size(); |
| val_size += label_val.size(); |
| } |
| |
| err = txn->commit(); |
| TEST_SYNC_POINT_CALLBACK("internal_clean_label:err", &err); |
| if (err != TxnErrorCode::TXN_OK) { |
| LOG(INFO) << fmt::format( |
| "label_key={} key_size={} val_size={} label_pb={} clean_txn_ids={}", hex(label_key), |
| key_size, val_size, label_pb.ShortDebugString(), fmt::join(clean_txn_ids, " ")); |
| } |
| return err; |
| } |
| |
| void MetaServiceImpl::clean_txn_label(::google::protobuf::RpcController* controller, |
| const CleanTxnLabelRequest* request, |
| CleanTxnLabelResponse* response, |
| ::google::protobuf::Closure* done) { |
| RPC_PREPROCESS(clean_txn_label); |
| if (!request->has_db_id()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "missing db id"; |
| LOG(WARNING) << msg; |
| return; |
| } |
| |
| std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : ""; |
| instance_id = get_instance_id(resource_mgr_, cloud_unique_id); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| ss << "cannot find instance_id with cloud_unique_id=" |
| << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id); |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| RPC_RATE_LIMIT(clean_txn_label) |
| const int64_t db_id = request->db_id(); |
| |
| // clean label only by db_id |
| if (request->labels().empty()) { |
| std::string begin_label_key = txn_label_key({instance_id, db_id, ""}); |
| const std::string end_label_key = txn_label_key({instance_id, db_id + 1, ""}); |
| |
| std::unique_ptr<RangeGetIterator> it; |
| bool snapshot = true; |
| int limit = 1000; |
| TEST_SYNC_POINT_CALLBACK("clean_txn_label:limit", &limit); |
| do { |
| std::unique_ptr<Transaction> txn; |
| auto err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| msg = "failed to create txn"; |
| code = cast_as<ErrCategory::CREATE>(err); |
| LOG(INFO) << msg << " err=" << err << " begin=" << hex(begin_label_key) |
| << " end=" << hex(end_label_key); |
| return; |
| } |
| |
| err = txn->get(begin_label_key, end_label_key, &it, snapshot, limit); |
| if (err != TxnErrorCode::TXN_OK) { |
| msg = fmt::format("failed to txn range get, err={}", err); |
| code = cast_as<ErrCategory::READ>(err); |
| LOG(WARNING) << msg << " begin=" << hex(begin_label_key) |
| << " end=" << hex(end_label_key); |
| return; |
| } |
| |
| if (!it->has_next()) { |
| LOG(INFO) << "no keys in the range. begin=" << hex(begin_label_key) |
| << " end=" << hex(end_label_key); |
| break; |
| } |
| while (it->has_next()) { |
| auto [k, v] = it->next(); |
| if (!it->has_next()) { |
| begin_label_key = k; |
| LOG(INFO) << "iterator has no more kvs. key=" << hex(k); |
| } |
| err = internal_clean_label(txn_kv_, instance_id, db_id, k); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to clean txn label. err={}", err); |
| LOG(WARNING) << msg << " db_id=" << db_id; |
| return; |
| } |
| } |
| begin_label_key.push_back('\x00'); |
| } while (it->more()); |
| } else { |
| const std::string& label = request->labels(0); |
| const std::string label_key = txn_label_key({instance_id, db_id, label}); |
| TxnErrorCode err = internal_clean_label(txn_kv_, instance_id, db_id, label_key); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to clean txn label. err={}", err); |
| LOG(WARNING) << msg << " db_id=" << db_id << " label_key=" << hex(label_key); |
| return; |
| } |
| } |
| |
| code = MetaServiceCode::OK; |
| } |
| |
| // get txn id by label |
| // 1. When the requested status is not empty, return the txnid |
| // corresponding to the status. There may be multiple |
| // requested status, just match one. |
| // 2. When the requested status is empty, return the latest txnid. |
| void MetaServiceImpl::get_txn_id(::google::protobuf::RpcController* controller, |
| const GetTxnIdRequest* request, GetTxnIdResponse* response, |
| ::google::protobuf::Closure* done) { |
| RPC_PREPROCESS(get_txn_id); |
| if (!request->has_db_id()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "missing db id"; |
| LOG(WARNING) << msg; |
| return; |
| } |
| |
| std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : ""; |
| instance_id = get_instance_id(resource_mgr_, cloud_unique_id); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| ss << "cannot find instance_id with cloud_unique_id=" |
| << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id); |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| RPC_RATE_LIMIT(get_txn_id) |
| const int64_t db_id = request->db_id(); |
| std::string label = request->label(); |
| const std::string label_key = txn_label_key({instance_id, db_id, label}); |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| LOG(WARNING) << "failed to create txn. err=" << err << " db_id=" << db_id |
| << " label=" << label; |
| code = cast_as<ErrCategory::CREATE>(err); |
| ss << "txn_kv_->create_txn() failed, err=" << err << " label=" << label |
| << " db_id=" << db_id; |
| msg = ss.str(); |
| return; |
| } |
| |
| std::string label_val; |
| err = txn->get(label_key, &label_val); |
| if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| code = cast_as<ErrCategory::READ>(err); |
| ss << "txn->get failed(), err=" << err << " label=" << label; |
| msg = ss.str(); |
| return; |
| } |
| |
| if (label_val.size() <= VERSION_STAMP_LEN) { |
| code = MetaServiceCode::TXN_ID_NOT_FOUND; |
| ss << "transaction not found, label=" << label; |
| return; |
| } |
| |
| TxnLabelPB label_pb; |
| //label_val.size() > VERSION_STAMP_LEN means label has previous txn ids. |
| if (!label_pb.ParseFromArray(label_val.data(), label_val.size() - VERSION_STAMP_LEN)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| ss << "label_pb->ParseFromString() failed, label=" << label; |
| msg = ss.str(); |
| return; |
| } |
| if (label_pb.txn_ids_size() == 0) { |
| code = MetaServiceCode::TXN_ID_NOT_FOUND; |
| ss << "transaction not found, label=" << label; |
| msg = ss.str(); |
| return; |
| } |
| |
| // find the latest txn |
| if (request->txn_status_size() == 0) { |
| response->set_txn_id(*label_pb.txn_ids().rbegin()); |
| return; |
| } |
| |
| for (auto& cur_txn_id : label_pb.txn_ids()) { |
| const std::string cur_info_key = txn_info_key({instance_id, db_id, cur_txn_id}); |
| std::string cur_info_val; |
| err = txn->get(cur_info_key, &cur_info_val); |
| if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| code = cast_as<ErrCategory::READ>(err); |
| ss << "txn->get() failed, cur_txn_id=" << cur_txn_id << " label=" << label |
| << " err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| |
| if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| //label_to_idx and txn info inconsistency. |
| code = MetaServiceCode::TXN_ID_NOT_FOUND; |
| ss << "txn->get() failed, cur_txn_id=" << cur_txn_id << " label=" << label |
| << " err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| |
| TxnInfoPB cur_txn_info; |
| if (!cur_txn_info.ParseFromString(cur_info_val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| ss << "cur_txn_info->ParseFromString() failed, cur_txn_id=" << cur_txn_id |
| << " label=" << label << " err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| |
| VLOG_DEBUG << "cur_txn_info=" << cur_txn_info.ShortDebugString(); |
| for (auto txn_status : request->txn_status()) { |
| if (cur_txn_info.status() == txn_status) { |
| response->set_txn_id(cur_txn_id); |
| return; |
| } |
| } |
| } |
| code = MetaServiceCode::TXN_ID_NOT_FOUND; |
| ss << "transaction not found, label=" << label; |
| msg = ss.str(); |
| return; |
| } |
| |
| } // namespace doris::cloud |