blob: 34d764a1324b3072038f8c1f4bba9fff2c5e0f56 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "txn_lazy_committer.h"
#include <gen_cpp/cloud.pb.h>
#include <gen_cpp/olap_file.pb.h>
#include <cerrno>
#include <chrono>
#include <random>
#include "common/bvars.h"
#include "common/config.h"
#include "common/defer.h"
#include "common/logging.h"
#include "common/stats.h"
#include "common/sync_executor.h"
#include "common/util.h"
#include "cpp/sync_point.h"
#include "meta-service/meta_service_helper.h"
#include "meta-service/meta_service_tablet_stats.h"
#include "meta-store/blob_message.h"
#include "meta-store/document_message.h"
#include "meta-store/keys.h"
#include "meta-store/txn_kv_error.h"
#include "meta-store/versioned_value.h"
#include "resource-manager/resource_manager.h"
using namespace std::chrono;
namespace doris::cloud {
void get_txn_db_id(TxnKv* txn_kv, const std::string& instance_id, int64_t txn_id,
MetaServiceCode& code, std::string& msg, int64_t* db_id, KVStats* stats);
void scan_tmp_rowset(
const std::string& instance_id, int64_t txn_id, std::shared_ptr<TxnKv> txn_kv,
MetaServiceCode& code, std::string& msg,
std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>* tmp_rowsets_meta,
KVStats* stats);
void update_tablet_stats(const StatsTabletKeyInfo& info, const TabletStats& stats,
std::unique_ptr<Transaction>& txn, MetaServiceCode& code,
std::string& msg);
static uint64_t generate_unique_seed() {
if (config::txn_lazy_commit_shuffle_seed != 0) {
return config::txn_lazy_commit_shuffle_seed;
} else {
auto now = high_resolution_clock::now();
return duration_cast<nanoseconds>(now.time_since_epoch()).count();
}
}
// Get the partition version from the txn kv store.
// If is_versioned_read is true, use versioned get to read the partition version with versionstamp.
static std::pair<MetaServiceCode, std::string> get_partition_version(
Transaction* txn, std::string_view instance_id, int64_t txn_id, int64_t db_id,
int64_t table_id, int64_t partition_id, VersionPB* partition_version,
Versionstamp* versionstamp, bool is_versioned_read) {
std::string partition_key, partition_version_value;
if (is_versioned_read) {
partition_key = versioned::partition_version_key({instance_id, partition_id});
TxnErrorCode err =
versioned_get(txn, partition_key, versionstamp, &partition_version_value);
if (err != TxnErrorCode::TXN_OK) {
MetaServiceCode code = err == TxnErrorCode::TXN_KEY_NOT_FOUND
? MetaServiceCode::TXN_ID_NOT_FOUND
: cast_as<ErrCategory::READ>(err);
return {code, fmt::format("failed to get partition version, txn_id={}, key={} err={}",
txn_id, hex(partition_key), err)};
}
} else {
partition_key = partition_version_key({instance_id, db_id, table_id, partition_id});
TxnErrorCode err = txn->get(partition_key, &partition_version_value);
if (err != TxnErrorCode::TXN_OK) {
MetaServiceCode code = cast_as<ErrCategory::READ>(err);
return {code,
fmt::format("failed to get partition version, txn_id={}, key={} err={}, "
"db_id={}, table_id={}, partition_id={}",
txn_id, hex(partition_key), err, db_id, table_id, partition_id)};
}
}
if (!partition_version->ParseFromString(partition_version_value)) {
return {MetaServiceCode::PROTOBUF_PARSE_ERR,
fmt::format("failed to parse partition version pb, txn_id={}, key={}", txn_id,
hex(partition_key))};
}
return {MetaServiceCode::OK, ""};
}
static std::pair<MetaServiceCode, std::string> get_partition_version(
TxnKv* txn_kv, std::string_view instance_id, int64_t txn_id, int64_t db_id,
int64_t table_id, int64_t partition_id, VersionPB* partition_version,
Versionstamp* versionstamp, bool is_versioned_read) {
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
return {cast_as<ErrCategory::CREATE>(err),
fmt::format("failed to create txn, txn_id={}, err={}", txn_id, err)};
}
return get_partition_version(txn.get(), instance_id, txn_id, db_id, table_id, partition_id,
partition_version, versionstamp, is_versioned_read);
}
static std::pair<MetaServiceCode, std::string> get_txn_info(TxnKv* txn_kv,
const std::string& instance_id,
int64_t db_id, int64_t txn_id,
TxnInfoPB* txn_info) {
std::string info_key = txn_info_key({instance_id, db_id, txn_id});
std::string info_value;
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
return {cast_as<ErrCategory::CREATE>(err),
fmt::format("failed to create txn, txn_id={}, err={}", txn_id, err)};
}
err = txn->get(info_key, &info_value);
if (err != TxnErrorCode::TXN_OK) {
MetaServiceCode code = err == TxnErrorCode::TXN_KEY_NOT_FOUND
? MetaServiceCode::TXN_ID_NOT_FOUND
: cast_as<ErrCategory::READ>(err);
return {code, fmt::format("failed to get txn info, key={}, err={}", hex(info_key), err)};
}
if (!txn_info->ParseFromString(info_value)) {
return {MetaServiceCode::PROTOBUF_PARSE_ERR,
fmt::format("failed to parse txn info pb, key={}", hex(info_key))};
}
return {MetaServiceCode::OK, ""};
}
static std::pair<MetaServiceCode, std::string> get_tablet_index(
CloneChainReader& meta_reader, Transaction* txn, std::string_view instance_id,
int64_t txn_id, int64_t tablet_id, TabletIndexPB* tablet_idx, bool is_versioned_read) {
std::stringstream ss;
MetaServiceCode code = MetaServiceCode::OK;
std::string msg;
if (!is_versioned_read) {
std::string tablet_idx_key = meta_tablet_idx_key({instance_id, tablet_id});
std::string tablet_idx_val;
TxnErrorCode err = txn->get(tablet_idx_key, &tablet_idx_val, true);
if (TxnErrorCode::TXN_OK != err) {
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND
: cast_as<ErrCategory::READ>(err);
ss << "failed to get tablet idx, txn_id=" << txn_id << " key=" << hex(tablet_idx_key)
<< " err=" << err;
msg = ss.str();
LOG(WARNING) << msg;
return {code, msg};
}
if (!tablet_idx->ParseFromString(tablet_idx_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "failed to parse tablet idx pb txn_id=" << txn_id
<< " key=" << hex(tablet_idx_key);
msg = ss.str();
return {code, msg};
}
} else {
TxnErrorCode err = meta_reader.get_tablet_index(txn, tablet_id, tablet_idx);
if (err != TxnErrorCode::TXN_OK) {
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND
: cast_as<ErrCategory::READ>(err);
ss << "failed to get tablet index, txn_id=" << txn_id << " tablet_id=" << tablet_id
<< " err=" << err;
msg = ss.str();
LOG(WARNING) << msg;
return {code, msg};
}
}
return {code, msg};
}
static std::pair<MetaServiceCode, std::string> get_tablet_index(
CloneChainReader& meta_reader, TxnKv* txn_kv, std::string_view instance_id, int64_t txn_id,
int64_t tablet_id, TabletIndexPB* tablet_idx, bool is_versioned_read) {
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
return {cast_as<ErrCategory::CREATE>(err),
fmt::format("failed to create txn, txn_id={}, err={}", txn_id, err)};
}
return get_tablet_index(meta_reader, txn.get(), instance_id, txn_id, tablet_id, tablet_idx,
is_versioned_read);
}
void convert_tmp_rowsets(
const std::string& instance_id, int64_t txn_id, std::shared_ptr<TxnKv> txn_kv,
MetaServiceCode& code, std::string& msg, int64_t db_id,
std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>& tmp_rowsets_meta,
std::map<int64_t, TabletIndexPB>& tablet_ids, bool is_versioned_write,
bool is_versioned_read, Versionstamp versionstamp, ResourceManager* resource_mgr,
bool defer_deleting_pending_delete_bitmaps) {
std::stringstream ss;
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
ss << "failed to create txn, txn_id=" << txn_id << " err=" << err;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
CloneChainReader meta_reader(instance_id, txn_kv.get(), resource_mgr);
// partition_id -> VersionPB
std::unordered_map<int64_t, VersionPB> partition_versions;
// tablet_id -> stats
std::unordered_map<int64_t, TabletStats> tablet_stats;
OperationLogPB op_log;
if (is_versioned_write) {
std::string log_key = encode_versioned_key(versioned::log_key({instance_id}), versionstamp);
ValueBuf log_value;
err = blob_get(txn.get(), log_key, &log_value);
if (err != TxnErrorCode::TXN_OK) {
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND
: cast_as<ErrCategory::READ>(err);
ss << "failed to get operation log, txn_id=" << txn_id << " key=" << hex(log_key)
<< " err=" << err;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
if (!log_value.to_pb(&op_log)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "failed to parse operation log pb, txn_id=" << txn_id << " key=" << hex(log_key);
msg = ss.str();
LOG(WARNING) << msg;
return;
}
}
int64_t rowsets_visible_ts_ms =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
for (auto& [tmp_rowset_key, tmp_rowset_pb] : tmp_rowsets_meta) {
std::string tmp_rowst_data;
err = txn->get(tmp_rowset_key, &tmp_rowst_data);
if (TxnErrorCode::TXN_KEY_NOT_FOUND == err) {
// the tmp rowset has been converted
VLOG_DEBUG << "tmp rowset has been converted, key=" << hex(tmp_rowset_key);
continue;
}
if (TxnErrorCode::TXN_OK != err) {
code = cast_as<ErrCategory::READ>(err);
ss << "failed to get tmp_rowset_key, txn_id=" << txn_id
<< " key=" << hex(tmp_rowset_key) << " err=" << err;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
if (!tablet_ids.contains(tmp_rowset_pb.tablet_id())) {
TabletIndexPB tablet_idx_pb;
std::tie(code, msg) =
get_tablet_index(meta_reader, txn.get(), instance_id, txn_id,
tmp_rowset_pb.tablet_id(), &tablet_idx_pb, is_versioned_read);
if (code != MetaServiceCode::OK) {
return;
}
tablet_ids.emplace(tmp_rowset_pb.tablet_id(), tablet_idx_pb);
}
const TabletIndexPB& tablet_idx_pb = tablet_ids[tmp_rowset_pb.tablet_id()];
if (!partition_versions.contains(tmp_rowset_pb.partition_id())) {
VersionPB version_pb;
if (!is_versioned_read) {
std::string ver_val;
std::string ver_key =
partition_version_key({instance_id, db_id, tablet_idx_pb.table_id(),
tmp_rowset_pb.partition_id()});
err = txn->get(ver_key, &ver_val);
if (TxnErrorCode::TXN_OK != err) {
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND
? MetaServiceCode::TXN_ID_NOT_FOUND
: cast_as<ErrCategory::READ>(err);
ss << "failed to get partiton version, txn_id=" << txn_id
<< " key=" << hex(ver_key) << " err=" << err;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
if (!version_pb.ParseFromString(ver_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "failed to parse version pb txn_id=" << txn_id << " key=" << hex(ver_key);
msg = ss.str();
return;
}
LOG(INFO) << "txn_id=" << txn_id << " key=" << hex(ver_key)
<< " version_pb:" << version_pb.ShortDebugString();
} else {
err = meta_reader.get_partition_version(txn.get(), tmp_rowset_pb.partition_id(),
&version_pb, nullptr);
if (err != TxnErrorCode::TXN_OK) {
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND
? MetaServiceCode::TXN_ID_NOT_FOUND
: cast_as<ErrCategory::READ>(err);
ss << "failed to get partition version, txn_id=" << txn_id
<< " partition_id=" << tmp_rowset_pb.partition_id() << " err=" << err;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
LOG(INFO) << "txn_id=" << txn_id << " partition_id=" << tmp_rowset_pb.partition_id()
<< " version_pb:" << version_pb.ShortDebugString();
}
if (version_pb.pending_txn_ids_size() == 0 || version_pb.pending_txn_ids(0) != txn_id) {
LOG(INFO) << "txn_id=" << txn_id << " partition_id=" << tmp_rowset_pb.partition_id()
<< " tmp_rowset_key=" << hex(tmp_rowset_key)
<< " version has already been converted."
<< " version_pb:" << version_pb.ShortDebugString();
TEST_SYNC_POINT_CALLBACK("convert_tmp_rowsets::already_been_converted",
&version_pb);
return;
}
partition_versions.emplace(tmp_rowset_pb.partition_id(), version_pb);
DCHECK_EQ(partition_versions.size(), 1) << partition_versions.size();
}
const VersionPB& version_pb = partition_versions[tmp_rowset_pb.partition_id()];
DCHECK(version_pb.pending_txn_ids_size() > 0);
DCHECK_EQ(version_pb.pending_txn_ids(0), txn_id);
int64_t version = version_pb.has_version() ? (version_pb.version() + 1) : 2;
std::string rowset_key = meta_rowset_key({instance_id, tmp_rowset_pb.tablet_id(), version});
std::string rowset_val;
if (!is_versioned_read) {
err = txn->get(rowset_key, &rowset_val);
if (TxnErrorCode::TXN_OK == err) {
// tmp rowset key has been converted
continue;
}
if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
code = cast_as<ErrCategory::READ>(err);
ss << "failed to get rowset_key, txn_id=" << txn_id << " key=" << hex(rowset_key)
<< " err=" << err;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
DCHECK(err == TxnErrorCode::TXN_KEY_NOT_FOUND);
} else {
int64_t tablet_id = tmp_rowset_pb.tablet_id();
RowsetMetaCloudPB rowset_val_pb;
err = meta_reader.get_load_rowset_meta(txn.get(), tablet_id, version, &rowset_val_pb);
if (TxnErrorCode::TXN_OK == err) {
// tmp rowset key has been converted
continue;
}
if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
code = cast_as<ErrCategory::READ>(err);
ss << "failed to get load_rowset_key, txn_id=" << txn_id
<< " tablet_id=" << tablet_id << " version=" << version << " err=" << err;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
DCHECK(err == TxnErrorCode::TXN_KEY_NOT_FOUND);
}
tmp_rowset_pb.set_start_version(version);
tmp_rowset_pb.set_end_version(version);
tmp_rowset_pb.set_visible_ts_ms(rowsets_visible_ts_ms);
rowset_val.clear();
if (!tmp_rowset_pb.SerializeToString(&rowset_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize rowset_meta, txn_id=" << txn_id
<< " key=" << hex(rowset_key);
msg = ss.str();
return;
}
txn->put(rowset_key, rowset_val);
LOG(INFO) << "put rowset_key=" << hex(rowset_key) << " txn_id=" << txn_id
<< " rowset_size=" << rowset_key.size() + rowset_val.size();
// Accumulate affected rows
auto& stats = tablet_stats[tmp_rowset_pb.tablet_id()];
stats.data_size += tmp_rowset_pb.total_disk_size();
stats.num_rows += tmp_rowset_pb.num_rows();
++stats.num_rowsets;
stats.num_segs += tmp_rowset_pb.num_segments();
stats.index_size += tmp_rowset_pb.index_disk_size();
stats.segment_size += tmp_rowset_pb.data_disk_size();
if (is_versioned_write) {
// If this is a versioned write, we need to put the rowset with versionstamp
RowsetMetaCloudPB copied_rowset_meta(tmp_rowset_pb);
std::string rowset_load_key = versioned::meta_rowset_load_key(
{instance_id, tmp_rowset_pb.tablet_id(), version});
if (!versioned::document_put(txn.get(), rowset_load_key, versionstamp,
std::move(copied_rowset_meta))) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize rowset_meta, txn_id=" << txn_id
<< " key=" << hex(rowset_load_key);
msg = ss.str();
return;
}
LOG(INFO) << "put versioned rowset_key=" << hex(rowset_load_key)
<< " txn_id=" << txn_id;
}
}
// Batch get existing versioned tablet stats if needed
std::unordered_map<int64_t, TabletStatsPB> existing_versioned_stats;
if (is_versioned_write && !tablet_stats.empty()) {
internal_get_load_tablet_stats_batch(code, msg, meta_reader, txn.get(), instance_id,
tablet_ids, &existing_versioned_stats);
if (code != MetaServiceCode::OK) {
LOG(WARNING) << "batch get versioned tablet stats failed, code=" << code
<< " msg=" << msg << " txn_id=" << txn_id;
return;
}
LOG(INFO) << "batch get " << existing_versioned_stats.size()
<< " versioned tablet stats, txn_id=" << txn_id;
int64_t min_timestamp = meta_reader.min_read_versionstamp().version();
int64_t old_min_timestamp = op_log.min_timestamp();
if (min_timestamp < op_log.min_timestamp()) {
op_log.set_min_timestamp(min_timestamp);
std::string log_key_prefix = versioned::log_key({instance_id});
versioned::blob_put(txn.get(), log_key_prefix, versionstamp, op_log);
LOG(INFO) << "update operation log min_timestamp from " << old_min_timestamp << " to "
<< min_timestamp << " txn_id=" << txn_id
<< " log_versionstamp=" << versionstamp.to_string();
}
}
for (auto& [tablet_id, stats] : tablet_stats) {
DCHECK(tablet_ids.count(tablet_id));
auto& tablet_idx = tablet_ids[tablet_id];
StatsTabletKeyInfo info {instance_id, tablet_idx.table_id(), tablet_idx.index_id(),
tablet_idx.partition_id(), tablet_id};
update_tablet_stats(info, stats, txn, code, msg);
if (code != MetaServiceCode::OK) return;
if (is_versioned_write) {
TabletStatsPB stats_pb = existing_versioned_stats[tablet_id];
merge_tablet_stats(stats_pb, stats);
std::string stats_key = versioned::tablet_load_stats_key({instance_id, tablet_id});
// put with specified versionstamp
if (!versioned::document_put(txn.get(), stats_key, versionstamp, std::move(stats_pb))) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
msg = "failed to serialize versioned tablet stats";
LOG(WARNING) << msg << " tablet_id=" << tablet_id << " txn_id=" << txn_id;
return;
}
LOG(INFO) << "put versioned tablet stats key=" << hex(stats_key)
<< " tablet_id=" << tablet_id << " txn_id=" << txn_id;
}
}
if (defer_deleting_pending_delete_bitmaps) {
for (auto& [tablet_id, _] : tablet_stats) {
std::string pending_key = meta_pending_delete_bitmap_key({instance_id, tablet_id});
txn->remove(pending_key);
LOG(INFO) << "remove delete bitmap pending key, pending_key=" << hex(pending_key)
<< " txn_id=" << txn_id;
}
}
TEST_SYNC_POINT_RETURN_WITH_VOID("convert_tmp_rowsets::before_commit", &code);
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::COMMIT>(err);
ss << "failed to commit kv txn, txn_id=" << txn_id << " err=" << err;
msg = ss.str();
return;
}
}
void make_committed_txn_visible(const std::string& instance_id, int64_t db_id, int64_t txn_id,
std::shared_ptr<TxnKv> txn_kv, MetaServiceCode& code,
std::string& msg, bool defer_deleting_pending_delete_bitmaps) {
// 1. visible txn info
// 2. remove running key and put recycle txn key
std::stringstream ss;
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
ss << "failed to create txn, txn_id=" << txn_id << " err=" << err;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
std::string info_val;
const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
err = txn->get(info_key, &info_val);
if (err != TxnErrorCode::TXN_OK) {
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND
: cast_as<ErrCategory::READ>(err);
ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << txn_id << " err=" << err;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
TxnInfoPB txn_info;
if (!txn_info.ParseFromString(info_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "failed to parse txn_info, txn_id=" << txn_id;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
VLOG_DEBUG << "txn_info:" << txn_info.ShortDebugString();
DCHECK((txn_info.status() == TxnStatusPB::TXN_STATUS_COMMITTED) ||
(txn_info.status() == TxnStatusPB::TXN_STATUS_VISIBLE));
if (txn_info.status() == TxnStatusPB::TXN_STATUS_COMMITTED) {
txn_info.set_status(TxnStatusPB::TXN_STATUS_VISIBLE);
if (!txn_info.SerializeToString(&info_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize txn_info when saving, txn_id=" << txn_id;
msg = ss.str();
return;
}
txn->put(info_key, info_val);
LOG(INFO) << "put info_key=" << hex(info_key) << " txn_id=" << txn_id;
const std::string running_key = txn_running_key({instance_id, db_id, txn_id});
LOG(INFO) << "remove running_key=" << hex(running_key) << " txn_id=" << txn_id;
txn->remove(running_key);
// Remove delete bitmap locks if deferring deletion
if (defer_deleting_pending_delete_bitmaps) {
for (int64_t table_id : txn_info.table_ids()) {
std::string lock_key =
meta_delete_bitmap_update_lock_key({instance_id, table_id, -1});
// Read the lock first to check if it still belongs to the current txn
std::string lock_val;
TxnErrorCode err = txn->get(lock_key, &lock_val);
if (err == TxnErrorCode::TXN_OK) {
DeleteBitmapUpdateLockPB lock_info;
if (lock_info.ParseFromString(lock_val)) {
// Only remove the lock if it still belongs to the current txn
if (lock_info.lock_id() == txn_id) {
txn->remove(lock_key);
LOG(INFO) << "remove delete bitmap lock, lock_key=" << hex(lock_key)
<< " table_id=" << table_id << " txn_id=" << txn_id;
} else {
LOG(WARNING) << "delete bitmap lock is held by another txn, "
<< "lock_key=" << hex(lock_key) << " table_id=" << table_id
<< " expected_txn_id=" << txn_id
<< " actual_lock_id=" << lock_info.lock_id();
}
}
} else if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
LOG(WARNING) << "failed to get delete bitmap lock, lock_key=" << hex(lock_key)
<< " table_id=" << table_id << " err=" << err;
}
}
}
// The recycle txn pb will be written when recycle the commit txn log,
// if the txn versioned write is enabled.
if (!txn_info.versioned_write()) {
std::string recycle_val;
std::string recycle_key = recycle_txn_key({instance_id, db_id, txn_id});
RecycleTxnPB recycle_pb;
auto now_time = system_clock::now();
uint64_t visible_time =
duration_cast<milliseconds>(now_time.time_since_epoch()).count();
recycle_pb.set_creation_time(visible_time);
recycle_pb.set_label(txn_info.label());
if (!recycle_pb.SerializeToString(&recycle_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize recycle_pb, txn_id=" << txn_id;
msg = ss.str();
return;
}
txn->put(recycle_key, recycle_val);
LOG(INFO) << "put recycle_key=" << hex(recycle_key) << " txn_id=" << txn_id;
}
TEST_SYNC_POINT_RETURN_WITH_VOID("TxnLazyCommitTask::make_committed_txn_visible::commit",
&code);
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::COMMIT>(err);
ss << "failed to commit kv txn, txn_id=" << txn_id << " err=" << err;
msg = ss.str();
return;
}
}
}
TxnLazyCommitTask::TxnLazyCommitTask(const std::string& instance_id, int64_t txn_id,
std::shared_ptr<TxnKv> txn_kv,
TxnLazyCommitter* txn_lazy_committer)
: instance_id_(instance_id),
txn_id_(txn_id),
txn_kv_(txn_kv),
txn_lazy_committer_(txn_lazy_committer) {
DCHECK(txn_id > 0);
}
void TxnLazyCommitTask::commit() {
StopWatch sw;
DORIS_CLOUD_DEFER {
{
std::unique_lock lock(mutex_);
this->finished_ = true;
}
this->cond_.notify_all();
g_bvar_txn_lazy_committer_committing_duration << sw.elapsed_us();
};
int64_t db_id;
get_txn_db_id(txn_kv_.get(), instance_id_, txn_id_, code_, msg_, &db_id, nullptr);
if (code_ != MetaServiceCode::OK) {
LOG(WARNING) << "get_txn_db_id failed, txn_id=" << txn_id_ << " code=" << code_
<< " msg=" << msg_;
return;
}
TxnInfoPB txn_info;
std::tie(code_, msg_) = get_txn_info(txn_kv_.get(), instance_id_, db_id, txn_id_, &txn_info);
if (code_ != MetaServiceCode::OK) {
LOG(WARNING) << "get_txn_info failed, txn_id=" << txn_id_ << " code=" << code_
<< " msg=" << msg_;
return;
}
if (txn_info.status() == TxnStatusPB::TXN_STATUS_VISIBLE) {
// The txn has been committed and made visible, no need to commit again.
LOG(INFO) << "txn_id=" << txn_id_ << " is already visible, skip commit";
return;
}
if (txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) {
// The txn has been aborted, no need to commit again.
code_ = MetaServiceCode::TXN_ALREADY_ABORTED;
LOG(INFO) << "txn_id=" << txn_id_ << " is already aborted, skip commit";
return;
}
bool defer_deleting_pending_delete_bitmaps = txn_info.defer_deleting_pending_delete_bitmaps();
bool is_versioned_write = txn_info.versioned_write();
bool is_versioned_read = txn_info.versioned_read();
CloneChainReader meta_reader(instance_id_, txn_kv_.get(),
txn_lazy_committer_->resource_manager().get());
std::stringstream ss;
int retry_times = 0;
do {
LOG(INFO) << "lazy task commit txn_id=" << txn_id_ << " retry_times=" << retry_times;
do {
code_ = MetaServiceCode::OK;
msg_.clear();
std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>> all_tmp_rowset_metas;
scan_tmp_rowset(instance_id_, txn_id_, txn_kv_, code_, msg_, &all_tmp_rowset_metas,
nullptr);
if (code_ != MetaServiceCode::OK) {
LOG(WARNING) << "scan_tmp_rowset failed, txn_id=" << txn_id_ << " code=" << code_;
break;
}
VLOG_DEBUG << "txn_id=" << txn_id_
<< " tmp_rowset_metas.size()=" << all_tmp_rowset_metas.size();
if (all_tmp_rowset_metas.size() == 0) {
LOG(INFO) << "empty tmp_rowset_metas, txn_id=" << txn_id_;
}
// <partition_id, tmp_rowsets>
std::map<int64_t, std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>>
partition_to_tmp_rowset_metas;
for (auto& [tmp_rowset_key, tmp_rowset_pb] : all_tmp_rowset_metas) {
partition_to_tmp_rowset_metas[tmp_rowset_pb.partition_id()].emplace_back();
partition_to_tmp_rowset_metas[tmp_rowset_pb.partition_id()].back().first =
tmp_rowset_key;
partition_to_tmp_rowset_metas[tmp_rowset_pb.partition_id()].back().second =
tmp_rowset_pb;
}
// Shuffle partition ids to reduce the conflict probability
std::vector<int64_t> partition_ids;
for (const auto& [partition_id, _] : partition_to_tmp_rowset_metas) {
partition_ids.push_back(partition_id);
}
if (config::txn_lazy_commit_shuffle_partitions) {
std::mt19937 rng(generate_unique_seed());
std::shuffle(partition_ids.begin(), partition_ids.end(), rng);
}
if (config::enable_cloud_parallel_txn_lazy_commit) {
SyncExecutor<std::pair<MetaServiceCode, std::string>> executor(
txn_lazy_committer_->parallel_commit_pool(),
fmt::format("txn_{}_parallel_commit", txn_id_));
for (int64_t partition_id : partition_ids) {
executor.add([&, partition_id, this]() {
return commit_partition(db_id, partition_id,
partition_to_tmp_rowset_metas.at(partition_id),
is_versioned_read, is_versioned_write,
defer_deleting_pending_delete_bitmaps);
});
}
bool finished = false;
auto task_results = executor.when_all(&finished);
for (auto&& [code, msg] : task_results) {
if (code != MetaServiceCode::OK) {
code_ = code;
msg_ = std::move(msg);
break;
}
}
if (code_ == MetaServiceCode::OK && !finished) {
LOG(WARNING) << "some partition commit tasks not finished, txn_id=" << txn_id_;
code_ = MetaServiceCode::KV_TXN_CONFLICT;
break;
}
} else {
for (int64_t partition_id : partition_ids) {
std::tie(code_, msg_) = commit_partition(
db_id, partition_id, partition_to_tmp_rowset_metas[partition_id],
is_versioned_read, is_versioned_write,
defer_deleting_pending_delete_bitmaps);
if (code_ != MetaServiceCode::OK) break;
}
}
if (code_ != MetaServiceCode::OK) {
LOG(WARNING) << "txn_id=" << txn_id_ << " code=" << code_ << " msg=" << msg_;
break;
}
make_committed_txn_visible(instance_id_, db_id, txn_id_, txn_kv_, code_, msg_,
defer_deleting_pending_delete_bitmaps);
} while (false);
} while (code_ == MetaServiceCode::KV_TXN_CONFLICT &&
retry_times++ < config::txn_store_retry_times);
}
std::pair<MetaServiceCode, std::string> TxnLazyCommitTask::commit_partition(
int64_t db_id, int64_t partition_id,
const std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>& tmp_rowset_metas,
bool is_versioned_read, bool is_versioned_write,
bool defer_deleting_pending_delete_bitmaps) {
std::stringstream ss;
CloneChainReader meta_reader(instance_id_, txn_kv_.get(),
txn_lazy_committer_->resource_manager().get());
MetaServiceCode code;
std::string msg;
StopWatch sw;
DORIS_CLOUD_DEFER {
g_bvar_txn_lazy_committer_commit_partition_duration << sw.elapsed_us();
};
// tablet_id -> TabletIndexPB
std::map<int64_t, TabletIndexPB> tablet_ids;
int64_t table_id = -1;
{
DCHECK(tmp_rowset_metas.size() > 0);
int64_t first_tablet_id = tmp_rowset_metas.begin()->second.tablet_id();
TabletIndexPB first_tablet_index;
std::tie(code, msg) =
get_tablet_index(meta_reader, txn_kv_.get(), instance_id_, txn_id_, first_tablet_id,
&first_tablet_index, is_versioned_read);
if (code != MetaServiceCode::OK) {
return {code, msg};
}
table_id = first_tablet_index.table_id();
tablet_ids.emplace(first_tablet_id, first_tablet_index);
}
// The partition version key is constructed during the txn commit process, so the versionstamp
// can be retrieved from the key itself.
Versionstamp versionstamp;
VersionPB original_partition_version;
std::tie(code, msg) = get_partition_version(txn_kv_.get(), instance_id_, txn_id_, db_id,
table_id, partition_id, &original_partition_version,
&versionstamp, is_versioned_read);
if (code != MetaServiceCode::OK) {
return {code, msg};
} else if (original_partition_version.pending_txn_ids_size() == 0 ||
original_partition_version.pending_txn_ids(0) != txn_id_) {
// The partition version does not contain the target pending txn, it might have been committed
LOG(INFO) << "txn_id=" << txn_id_ << " partition_id=" << partition_id
<< " version has already been converted."
<< " version_pb:" << original_partition_version.ShortDebugString();
TEST_SYNC_POINT_CALLBACK("TxnLazyCommitTask::commit::already_been_converted",
&original_partition_version);
return {MetaServiceCode::OK, ""};
}
size_t max_rowset_meta_size = 0;
for (const auto& [_, tmp_rowset_pb] : tmp_rowset_metas) {
max_rowset_meta_size = std::max(max_rowset_meta_size, tmp_rowset_pb.ByteSizeLong());
}
// fdb txn limit 10MB, we use 4MB as the max size for each batch.
size_t max_rowsets_per_batch = config::txn_lazy_max_rowsets_per_batch;
if (max_rowset_meta_size > 0) {
max_rowsets_per_batch = std::min((4UL << 20) / max_rowset_meta_size,
size_t(config::txn_lazy_max_rowsets_per_batch));
TEST_SYNC_POINT_CALLBACK("TxnLazyCommitTask::commit::max_rowsets_per_batch",
&max_rowsets_per_batch, &max_rowset_meta_size);
}
for (size_t i = 0; i < tmp_rowset_metas.size(); i += max_rowsets_per_batch) {
size_t end = (i + max_rowsets_per_batch) > tmp_rowset_metas.size()
? tmp_rowset_metas.size()
: i + max_rowsets_per_batch;
std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>
sub_partition_tmp_rowset_metas(tmp_rowset_metas.begin() + i,
tmp_rowset_metas.begin() + end);
convert_tmp_rowsets(instance_id_, txn_id_, txn_kv_, code, msg, db_id,
sub_partition_tmp_rowset_metas, tablet_ids, is_versioned_write,
is_versioned_read, versionstamp,
txn_lazy_committer_->resource_manager().get(),
defer_deleting_pending_delete_bitmaps);
if (code != MetaServiceCode::OK) {
return {code, msg};
}
}
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
ss << "failed to create txn, txn_id=" << txn_id_ << " err=" << err;
msg = ss.str();
LOG(WARNING) << msg;
return {code, msg};
}
DCHECK(table_id > 0);
DCHECK(partition_id > 0);
// Notice: get the versionstamp again, since it must not be changed here.
// But if you want to support multi pending txns in the partition version, this must be changed.
VersionPB version_pb;
std::tie(code, msg) =
get_partition_version(txn.get(), instance_id_, txn_id_, db_id, table_id, partition_id,
&version_pb, &versionstamp, is_versioned_read);
if (code != MetaServiceCode::OK) {
return {code, msg};
}
if (version_pb.pending_txn_ids_size() > 0 && version_pb.pending_txn_ids(0) == txn_id_) {
DCHECK(version_pb.pending_txn_ids_size() == 1);
version_pb.clear_pending_txn_ids();
if (version_pb.has_version()) {
version_pb.set_version(version_pb.version() + 1);
} else {
// first commit txn version is 2
version_pb.set_version(2);
}
std::string ver_key = partition_version_key({instance_id_, db_id, table_id, partition_id});
std::string ver_val;
if (!version_pb.SerializeToString(&ver_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize version_pb when saving, txn_id=" << txn_id_;
msg = ss.str();
return {code, msg};
}
txn->put(ver_key, ver_val);
LOG(INFO) << "put ver_key=" << hex(ver_key) << " txn_id=" << txn_id_
<< " version_pb=" << version_pb.ShortDebugString();
if (is_versioned_write) {
// Update the partition version with the specified versionstamp.
std::string versioned_key =
versioned::partition_version_key({instance_id_, partition_id});
versioned_put(txn.get(), versioned_key, versionstamp, ver_val);
LOG(INFO) << "put versioned ver_key=" << hex(versioned_key) << " txn_id=" << txn_id_
<< " version_pb=" << version_pb.ShortDebugString();
}
for (auto& [tmp_rowset_key, tmp_rowset_pb] : tmp_rowset_metas) {
txn->remove(tmp_rowset_key);
LOG(INFO) << "remove tmp_rowset_key=" << hex(tmp_rowset_key) << " txn_id=" << txn_id_;
}
TEST_SYNC_POINT_CALLBACK("TxnLazyCommitter::commit");
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::COMMIT>(err);
ss << "failed to commit kv txn, txn_id=" << txn_id_ << " err=" << err;
msg = ss.str();
return {code, msg};
}
}
return {MetaServiceCode::OK, ""};
}
std::pair<MetaServiceCode, std::string> TxnLazyCommitTask::wait() {
constexpr auto WAIT_FOR_MICROSECONDS = 5 * 1000000;
StopWatch sw;
uint64_t round = 0;
{
std::unique_lock lock(mutex_);
while (!finished_) {
if (cond_.wait_for(lock, WAIT_FOR_MICROSECONDS) == ETIMEDOUT) {
LOG(INFO) << "txn_id=" << txn_id_ << " wait_for 5s timeout round=" << ++round;
}
}
}
sw.pause();
g_bvar_txn_lazy_committer_waiting_duration << sw.elapsed_us();
if (sw.elapsed_us() > 1000000) {
LOG(INFO) << "txn_lazy_commit task wait more than 1000ms, cost=" << sw.elapsed_us() / 1000
<< " ms txn_id=" << txn_id_;
}
return std::make_pair(this->code_, this->msg_);
}
TxnLazyCommitter::TxnLazyCommitter(std::shared_ptr<TxnKv> txn_kv)
: TxnLazyCommitter(txn_kv, std::make_shared<ResourceManager>(txn_kv)) {}
TxnLazyCommitter::TxnLazyCommitter(std::shared_ptr<TxnKv> txn_kv,
std::shared_ptr<ResourceManager> resource_mgr)
: txn_kv_(txn_kv), resource_mgr_(std::move(resource_mgr)) {
worker_pool_ = std::make_unique<SimpleThreadPool>(config::txn_lazy_commit_num_threads,
"txn_lazy_commiter");
worker_pool_->start();
int32_t parallel_commit_threads = std::max(0, config::parallel_txn_lazy_commit_num_threads);
if (parallel_commit_threads == 0) {
parallel_commit_threads = std::thread::hardware_concurrency();
}
parallel_commit_pool_ =
std::make_shared<SimpleThreadPool>(parallel_commit_threads, "parallel_commit");
parallel_commit_pool_->start();
}
/**
* @brief Submit a lazy commit txn task
*
* @param instance_id
* @param txn_id
* @return std::shared_ptr<TxnLazyCommitTask>
*/
std::shared_ptr<TxnLazyCommitTask> TxnLazyCommitter::submit(const std::string& instance_id,
int64_t txn_id) {
std::shared_ptr<TxnLazyCommitTask> task;
{
std::unique_lock<std::mutex> lock(mutex_);
auto iter = running_tasks_.find(txn_id);
if (iter != running_tasks_.end()) {
return iter->second;
}
task = std::make_shared<TxnLazyCommitTask>(instance_id, txn_id, txn_kv_, this);
running_tasks_.emplace(txn_id, task);
g_bvar_txn_lazy_committer_submitted << 1;
}
worker_pool_->submit([task]() {
task->commit();
task->txn_lazy_committer_->remove(task->txn_id_);
g_bvar_txn_lazy_committer_finished << 1;
});
DCHECK(task != nullptr);
return task;
}
/**
* @brief Remove a lazy commit txn task
*
* @param txn_id
*/
void TxnLazyCommitter::remove(int64_t txn_id) {
std::unique_lock<std::mutex> lock(mutex_);
running_tasks_.erase(txn_id);
}
} // namespace doris::cloud