| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "meta_service.h" |
| |
| #include <brpc/channel.h> |
| #include <brpc/closure_guard.h> |
| #include <brpc/controller.h> |
| #include <bthread/bthread.h> |
| #include <fmt/core.h> |
| #include <gen_cpp/cloud.pb.h> |
| #include <gen_cpp/olap_file.pb.h> |
| #include <google/protobuf/repeated_ptr_field.h> |
| #include <google/protobuf/util/json_util.h> |
| #include <rapidjson/prettywriter.h> |
| #include <rapidjson/schema.h> |
| |
| #include <algorithm> |
| #include <chrono> |
| #include <cstddef> |
| #include <cstdint> |
| #include <cstdlib> |
| #include <functional> |
| #include <iomanip> |
| #include <ios> |
| #include <limits> |
| #include <memory> |
| #include <numeric> |
| #include <ostream> |
| #include <sstream> |
| #include <string> |
| #include <string_view> |
| #include <tuple> |
| #include <type_traits> |
| #include <unordered_map> |
| #include <utility> |
| |
| #include "common/bvars.h" |
| #include "common/config.h" |
| #include "common/encryption_util.h" |
| #include "common/logging.h" |
| #include "common/stats.h" |
| #include "common/stopwatch.h" |
| #include "common/string_util.h" |
| #include "common/util.h" |
| #include "cpp/sync_point.h" |
| #include "meta-service/delete_bitmap_lock_white_list.h" |
| #include "meta-service/doris_txn.h" |
| #include "meta-service/meta_service_helper.h" |
| #include "meta-service/meta_service_schema.h" |
| #include "meta-service/meta_service_tablet_stats.h" |
| #include "meta-store/blob_message.h" |
| #include "meta-store/clone_chain_reader.h" |
| #include "meta-store/codec.h" |
| #include "meta-store/document_message.h" |
| #include "meta-store/keys.h" |
| #include "meta-store/txn_kv.h" |
| #include "meta-store/txn_kv_error.h" |
| #include "meta-store/versioned_value.h" |
| #include "rate-limiter/rate_limiter.h" |
| #include "resource-manager/resource_manager.h" |
| |
| using namespace std::chrono; |
| |
| namespace doris::cloud { |
| |
| MetaServiceImpl::MetaServiceImpl(std::shared_ptr<TxnKv> txn_kv, |
| std::shared_ptr<ResourceManager> resource_mgr, |
| std::shared_ptr<RateLimiter> rate_limiter, |
| std::shared_ptr<SnapshotManager> snapshot_manager) |
| : txn_kv_(std::move(txn_kv)), |
| resource_mgr_(std::move(resource_mgr)), |
| rate_limiter_(std::move(rate_limiter)), |
| txn_lazy_committer_(std::make_shared<TxnLazyCommitter>(txn_kv_, resource_mgr_)), |
| delete_bitmap_lock_white_list_(std::make_shared<DeleteBitmapLockWhiteList>()), |
| snapshot_manager_(std::move(snapshot_manager)) { |
| rate_limiter_->init(this); |
| delete_bitmap_lock_white_list_->init(); |
| } |
| |
| MetaServiceImpl::~MetaServiceImpl() = default; |
| |
| void update_tablet_stats(const StatsTabletKeyInfo& info, const TabletStats& stats, |
| std::unique_ptr<Transaction>& txn, MetaServiceCode& code, |
| std::string& msg); |
| |
| // FIXME(gavin): should it be a member function of ResourceManager? |
| std::string get_instance_id(const std::shared_ptr<ResourceManager>& rc_mgr, |
| const std::string& cloud_unique_id) { |
| TEST_SYNC_POINT_RETURN_WITH_VALUE("get_instance_id", std::string {}); |
| |
| std::vector<NodeInfo> nodes; |
| std::string err = rc_mgr->get_node(cloud_unique_id, &nodes); |
| TEST_SYNC_POINT_CALLBACK("get_instance_id_err", &err); |
| |
| std::string instance_id; |
| if (!err.empty()) { |
| // cache can't find cloud_unique_id, so degraded by parse cloud_unique_id |
| // cloud_unique_id encode: ${version}:${instance_id}:${unique_id} |
| // check it split by ':' c |
| auto [valid, id] = ResourceManager::get_instance_id_by_cloud_unique_id(cloud_unique_id); |
| if (!valid) { |
| LOG(WARNING) << "use degraded format cloud_unique_id, but cloud_unique_id not degrade " |
| "format, cloud_unique_id=" |
| << cloud_unique_id; |
| return ""; |
| } |
| |
| // check instance_id valid by get fdb |
| if (config::enable_check_instance_id && !rc_mgr->is_instance_id_registered(id)) { |
| LOG(WARNING) << "use degraded format cloud_unique_id, but check instance failed, " |
| "cloud_unique_id=" |
| << cloud_unique_id; |
| return ""; |
| } |
| return id; |
| } |
| |
| for (auto& node : nodes) { |
| if (!instance_id.empty() && instance_id != node.instance_id) { |
| LOG(WARNING) << "cloud_unique_id is one-to-many instance_id, " |
| << " cloud_unique_id=" << cloud_unique_id |
| << " current_instance_id=" << instance_id |
| << " later_instance_id=" << node.instance_id; |
| } |
| instance_id = node.instance_id; // The last wins |
| // check cache unique_id |
| std::string cloud_unique_id_in_cache = node.node_info.cloud_unique_id(); |
| auto [valid, id] = |
| ResourceManager::get_instance_id_by_cloud_unique_id(cloud_unique_id_in_cache); |
| if (!valid) { |
| continue; |
| } |
| |
| if (id != node.instance_id || id != instance_id) { |
| LOG(WARNING) << "in cache, node=" << node.node_info.DebugString() |
| << ", cloud_unique_id=" << cloud_unique_id |
| << " current_instance_id=" << instance_id |
| << ", later_instance_id=" << node.instance_id; |
| continue; |
| } |
| } |
| |
| return instance_id; |
| } |
| |
| // Return `true` if tablet has been dropped; otherwise or it may not determine when meeting errors, return false |
| bool is_dropped_tablet(Transaction* txn, const std::string& instance_id, int64_t index_id, |
| int64_t partition_id) { |
| auto key = recycle_index_key({instance_id, index_id}); |
| std::string val; |
| TxnErrorCode err = txn->get(key, &val); |
| if (err == TxnErrorCode::TXN_OK) { |
| RecycleIndexPB pb; |
| if (!pb.ParseFromString(val)) [[unlikely]] { |
| LOG_WARNING("malformed recycle index pb").tag("key", hex(key)); |
| return false; |
| } |
| return pb.state() == RecycleIndexPB::DROPPED || pb.state() == RecycleIndexPB::RECYCLING; |
| } |
| if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) { // Get kv failed, cannot determine, return false |
| return false; |
| } |
| key = recycle_partition_key({instance_id, partition_id}); |
| err = txn->get(key, &val); |
| if (err == TxnErrorCode::TXN_OK) { |
| RecyclePartitionPB pb; |
| if (!pb.ParseFromString(val)) [[unlikely]] { |
| LOG_WARNING("malformed recycle partition pb").tag("key", hex(key)); |
| return false; |
| } |
| return pb.state() == RecyclePartitionPB::DROPPED || |
| pb.state() == RecyclePartitionPB::RECYCLING; |
| } |
| return false; |
| } |
| |
| void get_tablet_idx(MetaServiceCode& code, std::string& msg, Transaction* txn, |
| const std::string& instance_id, int64_t tablet_id, TabletIndexPB& tablet_idx) { |
| std::string key, val; |
| meta_tablet_idx_key({instance_id, tablet_id}, &key); |
| TxnErrorCode err = txn->get(key, &val); |
| if (err != TxnErrorCode::TXN_OK) { |
| if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| code = MetaServiceCode::TABLET_NOT_FOUND; |
| } else { |
| code = cast_as<ErrCategory::READ>(err); |
| } |
| msg = fmt::format("failed to get tablet_idx, err={} tablet_id={} key={}", err, tablet_id, |
| hex(key)); |
| return; |
| } |
| if (!tablet_idx.ParseFromString(val)) [[unlikely]] { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = fmt::format("malformed tablet index value, tablet_id={} key={}", tablet_id, hex(key)); |
| return; |
| } |
| if (tablet_id != tablet_idx.tablet_id()) [[unlikely]] { |
| code = MetaServiceCode::UNDEFINED_ERR; |
| msg = "internal error"; |
| LOG(WARNING) << "unexpected error given_tablet_id=" << tablet_id |
| << " idx_pb_tablet_id=" << tablet_idx.tablet_id() << " key=" << hex(key); |
| return; |
| } |
| } |
| |
| void MetaServiceImpl::get_version(::google::protobuf::RpcController* controller, |
| const GetVersionRequest* request, GetVersionResponse* response, |
| ::google::protobuf::Closure* done) { |
| if (request->batch_mode()) { |
| batch_get_version(controller, request, response, done); |
| return; |
| } |
| |
| RPC_PREPROCESS(get_version, get); |
| std::string cloud_unique_id; |
| if (request->has_cloud_unique_id()) { |
| cloud_unique_id = request->cloud_unique_id(); |
| } |
| |
| bool is_table_version = false; |
| if (request->has_is_table_version()) { |
| is_table_version = request->is_table_version(); |
| } |
| |
| int64_t db_id = request->has_db_id() ? request->db_id() : -1; |
| int64_t table_id = request->has_table_id() ? request->table_id() : -1; |
| int64_t partition_id = request->has_partition_id() ? request->partition_id() : -1; |
| if (db_id == -1 || table_id == -1 || (!is_table_version && partition_id == -1)) { |
| msg = "params error, db_id=" + std::to_string(db_id) + |
| " table_id=" + std::to_string(table_id) + |
| " partition_id=" + std::to_string(partition_id) + |
| " is_table_version=" + std::to_string(is_table_version); |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| LOG(WARNING) << msg; |
| return; |
| } |
| |
| instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id()); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "empty instance_id"; |
| LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id(); |
| return; |
| } |
| RPC_RATE_LIMIT(get_version) |
| |
| bool is_versioned_read = is_version_read_enabled(instance_id); |
| if (is_versioned_read) { |
| CloneChainReader reader(instance_id, txn_kv_.get(), resource_mgr_.get()); |
| if (is_table_version) { |
| Versionstamp table_version; |
| TxnErrorCode err = reader.get_table_version(table_id, &table_version); |
| if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| msg = "table version not found"; |
| code = MetaServiceCode::VERSION_NOT_FOUND; |
| return; |
| } else if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to get table version, err={} table_id={}", err, table_id); |
| return; |
| } |
| response->set_version(table_version.version()); |
| } else { |
| VersionPB partition_version; |
| TxnErrorCode err = |
| reader.get_partition_version(partition_id, &partition_version, nullptr); |
| if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| msg = "partition version not found"; |
| code = MetaServiceCode::VERSION_NOT_FOUND; |
| return; |
| } else if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to get partition version, err={} table_id={}", err, |
| table_id); |
| return; |
| } |
| |
| if (request->wait_for_pending_txn() && partition_version.pending_txn_ids_size() > 0) { |
| DCHECK_EQ(partition_version.pending_txn_ids_size(), 1) |
| << "only support one pending txn id for now, version=" |
| << partition_version.ShortDebugString(); |
| int64_t first_txn_id = partition_version.pending_txn_ids(0); |
| std::shared_ptr<TxnLazyCommitTask> task = |
| txn_lazy_committer_->submit(instance_id, first_txn_id); |
| std::tie(code, msg) = task->wait(); |
| if (code != MetaServiceCode::OK) { |
| LOG(WARNING) << "wait for pending txn " << first_txn_id |
| << " failed, code=" << code << " msg=" << msg; |
| return; |
| } |
| partition_version.set_version(partition_version.version() + 1); |
| partition_version.clear_pending_txn_ids(); |
| } |
| |
| response->set_version(partition_version.version()); |
| response->add_version_update_time_ms(partition_version.update_time_ms()); |
| } |
| TEST_SYNC_POINT_CALLBACK("get_version_code", &code); |
| return; |
| } |
| |
| std::string ver_key; |
| if (is_table_version) { |
| table_version_key({instance_id, db_id, table_id}, &ver_key); |
| } else { |
| partition_version_key({instance_id, db_id, table_id, partition_id}, &ver_key); |
| } |
| |
| code = MetaServiceCode::OK; |
| |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| msg = "failed to create txn"; |
| code = cast_as<ErrCategory::CREATE>(err); |
| return; |
| } |
| |
| std::string ver_val; |
| // 0 for success get a key, 1 for key not found, negative for error |
| err = txn->get(ver_key, &ver_val); |
| VLOG_DEBUG << "xxx get version_key=" << hex(ver_key); |
| if (err == TxnErrorCode::TXN_OK) { |
| if (is_table_version) { |
| int64_t version = 0; |
| if (!txn->decode_atomic_int(ver_val, &version)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = "malformed table version value"; |
| return; |
| } |
| response->set_version(version); |
| } else { |
| VersionPB version_pb; |
| if (!version_pb.ParseFromString(ver_val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = "malformed version value"; |
| return; |
| } |
| if (!version_pb.has_version()) { |
| msg = "not found"; |
| code = MetaServiceCode::VERSION_NOT_FOUND; |
| return; |
| } |
| |
| if (request->wait_for_pending_txn() && version_pb.pending_txn_ids_size() > 0) { |
| DCHECK_EQ(version_pb.pending_txn_ids_size(), 1) |
| << "only support one pending txn id for now, version=" |
| << version_pb.ShortDebugString(); |
| int64_t first_txn_id = version_pb.pending_txn_ids(0); |
| std::shared_ptr<TxnLazyCommitTask> task = |
| txn_lazy_committer_->submit(instance_id, first_txn_id); |
| std::tie(code, msg) = task->wait(); |
| if (code != MetaServiceCode::OK) { |
| LOG(WARNING) << "wait for pending txn " << first_txn_id |
| << " failed, code=" << code << " msg=" << msg; |
| return; |
| } |
| version_pb.set_version(version_pb.version() + 1); |
| version_pb.clear_pending_txn_ids(); |
| } |
| |
| response->set_version(version_pb.version()); |
| response->add_version_update_time_ms(version_pb.update_time_ms()); |
| } |
| TEST_SYNC_POINT_CALLBACK("get_version_code", &code); |
| return; |
| } else if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| msg = "not found"; |
| code = MetaServiceCode::VERSION_NOT_FOUND; |
| return; |
| } |
| msg = fmt::format("failed to get txn, err={}", err); |
| code = cast_as<ErrCategory::READ>(err); |
| } |
| |
| void MetaServiceImpl::batch_get_version(::google::protobuf::RpcController* controller, |
| const GetVersionRequest* request, |
| GetVersionResponse* response, |
| ::google::protobuf::Closure* done) { |
| RPC_PREPROCESS(get_version, get); |
| |
| std::string cloud_unique_id; |
| if (request->has_cloud_unique_id()) { |
| cloud_unique_id = request->cloud_unique_id(); |
| } |
| |
| bool is_table_version = false; |
| if (request->has_is_table_version()) { |
| is_table_version = request->is_table_version(); |
| } |
| |
| if (request->db_ids_size() == 0 || request->table_ids_size() == 0 || |
| (!is_table_version && request->table_ids_size() != request->partition_ids_size()) || |
| (!is_table_version && request->db_ids_size() != request->partition_ids_size())) { |
| msg = "param error, num db_ids=" + std::to_string(request->db_ids_size()) + |
| " num table_ids=" + std::to_string(request->table_ids_size()) + |
| " num partition_ids=" + std::to_string(request->partition_ids_size()) + |
| " is_table_version=" + std::to_string(request->is_table_version()); |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| LOG(WARNING) << msg; |
| return; |
| } |
| |
| instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id()); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "empty instance_id"; |
| LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id(); |
| return; |
| } |
| |
| if (is_version_read_enabled(instance_id)) { |
| if (is_table_version) { |
| std::tie(code, msg) = batch_get_table_versions(request, response, instance_id, stats); |
| } else { |
| std::tie(code, msg) = |
| batch_get_partition_versions(request, response, instance_id, stats); |
| } |
| TEST_SYNC_POINT_CALLBACK("batch_get_version_code", &code); |
| if (code != MetaServiceCode::OK) { |
| response->clear_partition_ids(); |
| response->clear_table_ids(); |
| response->clear_versions(); |
| response->clear_db_ids(); |
| } |
| return; |
| } |
| |
| size_t num_acquired = |
| is_table_version ? request->table_ids_size() : request->partition_ids_size(); |
| response->mutable_versions()->Reserve(num_acquired); |
| response->mutable_db_ids()->CopyFrom(request->db_ids()); |
| response->mutable_table_ids()->CopyFrom(request->table_ids()); |
| if (!is_table_version) { |
| response->mutable_partition_ids()->CopyFrom(request->partition_ids()); |
| } |
| |
| constexpr size_t BATCH_SIZE = 500; |
| std::vector<std::string> version_keys; |
| std::vector<std::optional<std::string>> version_values; |
| version_keys.reserve(BATCH_SIZE); |
| version_values.reserve(BATCH_SIZE); |
| |
| while ((code == MetaServiceCode::OK || code == MetaServiceCode::KV_TXN_TOO_OLD) && |
| response->versions_size() < num_acquired) { |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| msg = "failed to create txn"; |
| code = cast_as<ErrCategory::CREATE>(err); |
| break; |
| } |
| DORIS_CLOUD_DEFER { |
| if (txn == nullptr) return; |
| stats.get_bytes += txn->get_bytes(); |
| stats.get_counter += txn->num_get_keys(); |
| }; |
| for (size_t i = response->versions_size(); i < num_acquired; i += BATCH_SIZE) { |
| size_t limit = (i + BATCH_SIZE < num_acquired) ? i + BATCH_SIZE : num_acquired; |
| version_keys.clear(); |
| version_values.clear(); |
| for (size_t j = i; j < limit; j++) { |
| int64_t db_id = request->db_ids(j); |
| int64_t table_id = request->table_ids(j); |
| std::string ver_key; |
| if (is_table_version) { |
| table_version_key({instance_id, db_id, table_id}, &ver_key); |
| } else { |
| int64_t partition_id = request->partition_ids(j); |
| partition_version_key({instance_id, db_id, table_id, partition_id}, &ver_key); |
| } |
| version_keys.push_back(std::move(ver_key)); |
| } |
| |
| err = txn->batch_get(&version_values, version_keys); |
| TEST_SYNC_POINT_CALLBACK("batch_get_version_err", &err); |
| if (err == TxnErrorCode::TXN_TOO_OLD) { |
| // txn too old, fallback to non-snapshot versions. |
| LOG(WARNING) << "batch_get_version execution time exceeds the txn mvcc window, " |
| "fallback to acquire non-snapshot versions, partition_ids_size=" |
| << request->partition_ids_size() << ", index=" << i; |
| break; |
| } else if (err != TxnErrorCode::TXN_OK) { |
| msg = fmt::format("failed to batch get versions, index={}, err={}", i, err); |
| code = cast_as<ErrCategory::READ>(err); |
| break; |
| } |
| |
| if (is_table_version) { |
| for (auto&& value : version_values) { |
| if (!value.has_value()) { |
| // return -1 if the target version is not exists. |
| response->add_versions(-1); |
| } else { |
| int64_t version = 0; |
| if (!txn->decode_atomic_int(*value, &version)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = "malformed table version value"; |
| break; |
| } |
| response->add_versions(version); |
| } |
| } |
| } else { |
| std::vector<VersionPB> partition_versions; |
| for (auto&& value : version_values) { |
| VersionPB version_pb; |
| if (!value.has_value()) { |
| // return -1 if the target version is not exists. |
| version_pb.set_version(-1); |
| version_pb.set_update_time_ms(-1); |
| } else if (!version_pb.ParseFromString(*value)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = "malformed version value"; |
| break; |
| } |
| partition_versions.push_back(std::move(version_pb)); |
| } |
| if (code != MetaServiceCode::OK) { |
| break; |
| } |
| if (request->wait_for_pending_txn()) { |
| std::tie(code, msg) = wait_for_pending_txns(instance_id, partition_versions); |
| if (code != MetaServiceCode::OK) { |
| break; |
| } |
| } |
| for (auto& version_pb : partition_versions) { |
| response->add_versions(version_pb.version()); |
| response->add_version_update_time_ms(version_pb.update_time_ms()); |
| } |
| } |
| } |
| } |
| if (code != MetaServiceCode::OK) { |
| response->clear_partition_ids(); |
| response->clear_table_ids(); |
| response->clear_versions(); |
| } |
| } |
| |
| std::pair<MetaServiceCode, std::string> MetaServiceImpl::batch_get_table_versions( |
| const GetVersionRequest* request, GetVersionResponse* response, |
| std::string_view instance_id, KVStats& stats) { |
| size_t num_acquired = request->table_ids_size(); |
| response->mutable_versions()->Reserve(num_acquired); |
| response->mutable_db_ids()->CopyFrom(request->db_ids()); |
| response->mutable_table_ids()->CopyFrom(request->table_ids()); |
| response->mutable_partition_ids()->CopyFrom(request->partition_ids()); |
| |
| constexpr size_t BATCH_SIZE = 500; |
| CloneChainReader reader(instance_id, txn_kv_.get(), resource_mgr_.get()); |
| std::vector<int64_t> acquired_ids; |
| acquired_ids.reserve(BATCH_SIZE); |
| |
| while (response->versions_size() < num_acquired) { |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| return {cast_as<ErrCategory::CREATE>(err), "failed to create txn"}; |
| } |
| DORIS_CLOUD_DEFER { |
| if (txn == nullptr) return; |
| stats.get_bytes += txn->get_bytes(); |
| stats.get_counter += txn->num_get_keys(); |
| }; |
| for (size_t i = response->versions_size(); i < num_acquired; i += BATCH_SIZE) { |
| size_t limit = (i + BATCH_SIZE < num_acquired) ? i + BATCH_SIZE : num_acquired; |
| acquired_ids.clear(); |
| for (size_t j = i; j < limit; j++) { |
| acquired_ids.push_back(request->table_ids(j)); |
| } |
| std::unordered_map<int64_t, Versionstamp> table_versions; |
| err = reader.get_table_versions(acquired_ids, &table_versions, true); |
| TEST_SYNC_POINT_CALLBACK("batch_get_version_err", &err); |
| if (err == TxnErrorCode::TXN_TOO_OLD) { |
| // txn too old, fallback to non-snapshot versions. |
| LOG(WARNING) << "batch_get_version execution time exceeds the txn mvcc window, " |
| "fallback to acquire non-snapshot versions, table_ids_size=" |
| << request->table_ids_size() << ", index=" << i; |
| break; |
| } else if (err != TxnErrorCode::TXN_OK) { |
| return {cast_as<ErrCategory::READ>(err), |
| fmt::format("failed to batch get versions, index={}, err={}", i, err)}; |
| } |
| for (auto& acquired_id : acquired_ids) { |
| auto it = table_versions.find(acquired_id); |
| if (it == table_versions.end()) { |
| // return -1 if the target version is not exists. |
| response->add_versions(-1); |
| } else { |
| response->add_versions(it->second.version()); |
| } |
| } |
| } |
| } |
| |
| return {MetaServiceCode::OK, ""}; |
| } |
| |
| std::pair<MetaServiceCode, std::string> MetaServiceImpl::batch_get_partition_versions( |
| const GetVersionRequest* request, GetVersionResponse* response, |
| std::string_view instance_id, KVStats& stats) { |
| size_t num_acquired = request->partition_ids_size(); |
| response->mutable_versions()->Reserve(num_acquired); |
| response->mutable_db_ids()->CopyFrom(request->db_ids()); |
| response->mutable_table_ids()->CopyFrom(request->table_ids()); |
| |
| constexpr size_t BATCH_SIZE = 500; |
| |
| CloneChainReader reader(instance_id, txn_kv_.get(), resource_mgr_.get()); |
| std::vector<int64_t> acquired_ids; |
| acquired_ids.reserve(BATCH_SIZE); |
| |
| while (response->versions_size() < num_acquired) { |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| return {cast_as<ErrCategory::CREATE>(err), "failed to create txn"}; |
| } |
| DORIS_CLOUD_DEFER { |
| if (txn == nullptr) return; |
| stats.get_bytes += txn->get_bytes(); |
| stats.get_counter += txn->num_get_keys(); |
| }; |
| for (size_t i = response->versions_size(); i < num_acquired; i += BATCH_SIZE) { |
| size_t limit = (i + BATCH_SIZE < num_acquired) ? i + BATCH_SIZE : num_acquired; |
| acquired_ids.clear(); |
| for (size_t j = i; j < limit; j++) { |
| acquired_ids.push_back(request->partition_ids(j)); |
| } |
| std::unordered_map<int64_t, VersionPB> partition_version_map; |
| std::unordered_map<int64_t, Versionstamp> versionstamps; |
| err = reader.get_partition_versions(acquired_ids, &partition_version_map, |
| &versionstamps, true); |
| if (err == TxnErrorCode::TXN_TOO_OLD) { |
| // txn too old, fallback to non-snapshot versions. |
| LOG(WARNING) << "batch_get_version execution time exceeds the txn mvcc window, " |
| "fallback to acquire non-snapshot versions, partition_ids_size=" |
| << request->partition_ids_size() << ", index=" << i; |
| break; |
| } else if (err != TxnErrorCode::TXN_OK) { |
| return {cast_as<ErrCategory::READ>(err), |
| fmt::format("failed to batch get versions, index={}, err={}", i, err)}; |
| } |
| |
| std::vector<VersionPB> partition_versions; |
| for (auto& acquired_id : acquired_ids) { |
| auto it = partition_version_map.find(acquired_id); |
| if (it == partition_version_map.end()) { |
| // return -1 if the target version is not exists. |
| VersionPB version_pb; |
| version_pb.set_version(-1); |
| version_pb.set_update_time_ms(-1); |
| partition_versions.push_back(std::move(version_pb)); |
| } else { |
| partition_versions.push_back(it->second); |
| } |
| } |
| if (request->wait_for_pending_txn()) { |
| auto&& [code, msg] = |
| wait_for_pending_txns(std::string(instance_id), partition_versions); |
| if (code != MetaServiceCode::OK) { |
| return {code, msg}; |
| } |
| } |
| for (auto& version_pb : partition_versions) { |
| response->add_versions(version_pb.version()); |
| response->add_version_update_time_ms(version_pb.update_time_ms()); |
| } |
| } |
| } |
| |
| return {MetaServiceCode::OK, ""}; |
| } |
| |
| void internal_create_tablet(const CreateTabletsRequest* request, MetaServiceCode& code, |
| std::string& msg, const doris::TabletMetaCloudPB& meta, |
| std::shared_ptr<TxnKv> txn_kv, const std::string& instance_id, |
| std::set<std::pair<int64_t, int32_t>>& saved_schema, KVStats& stats, |
| bool is_versioned_write) { |
| doris::TabletMetaCloudPB tablet_meta(meta); |
| bool has_first_rowset = tablet_meta.rs_metas_size() > 0; |
| |
| // TODO: validate tablet meta, check existence |
| int64_t table_id = tablet_meta.table_id(); |
| int64_t index_id = tablet_meta.index_id(); |
| int64_t partition_id = tablet_meta.partition_id(); |
| int64_t tablet_id = tablet_meta.tablet_id(); |
| |
| if (!tablet_meta.has_schema() && !tablet_meta.has_schema_version()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "tablet_meta must have either schema or schema_version"; |
| return; |
| } |
| |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| msg = "failed to init txn"; |
| return; |
| } |
| DORIS_CLOUD_DEFER { |
| if (txn == nullptr) return; |
| stats.get_bytes += txn->get_bytes(); |
| stats.put_bytes += txn->put_bytes(); |
| stats.get_counter += txn->num_get_keys(); |
| stats.put_counter += txn->num_put_keys(); |
| }; |
| |
| std::string rs_key, rs_val; |
| if (has_first_rowset) { |
| // Put first rowset if needed |
| auto first_rowset = tablet_meta.mutable_rs_metas(0); |
| if (config::write_schema_kv) { // detach schema from rowset meta |
| first_rowset->set_index_id(index_id); |
| first_rowset->set_schema_version(tablet_meta.has_schema_version() |
| ? tablet_meta.schema_version() |
| : tablet_meta.schema().schema_version()); |
| first_rowset->set_allocated_tablet_schema(nullptr); |
| } |
| MetaRowsetKeyInfo rs_key_info {instance_id, tablet_id, first_rowset->end_version()}; |
| meta_rowset_key(rs_key_info, &rs_key); |
| if (!first_rowset->SerializeToString(&rs_val)) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| msg = "failed to serialize first rowset meta"; |
| return; |
| } |
| txn->put(rs_key, rs_val); |
| if (is_versioned_write) { |
| std::string meta_rowset_key = versioned::meta_rowset_key( |
| {instance_id, tablet_id, first_rowset->rowset_id_v2()}); |
| blob_put(txn.get(), meta_rowset_key, rs_val, 0); |
| |
| std::string rowset_ref_count_key = versioned::data_rowset_ref_count_key( |
| {instance_id, tablet_id, first_rowset->rowset_id_v2()}); |
| txn->atomic_add(rowset_ref_count_key, 1); |
| std::string versioned_rs_key = versioned::meta_rowset_load_key( |
| {instance_id, tablet_id, first_rowset->end_version()}); |
| if (!versioned::document_put(txn.get(), versioned_rs_key, std::move(*first_rowset))) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| msg = fmt::format("failed to serialize versioned rowset meta, key={}", |
| hex(versioned_rs_key)); |
| return; |
| } |
| LOG(INFO) << "put first versioned rowset meta, tablet_id=" << tablet_id |
| << " rowset_id=" << first_rowset->rowset_id_v2() |
| << " end_version=" << first_rowset->end_version() |
| << " key=" << hex(versioned_rs_key) |
| << " rowset_ref_count_key=" << hex(rowset_ref_count_key) |
| << " meta_rowset_key=" << hex(meta_rowset_key); |
| } |
| |
| tablet_meta.clear_rs_metas(); // Strip off rowset meta |
| } |
| |
| if (tablet_meta.has_schema()) { |
| // Temporary hard code to fix wrong column type string generated by FE |
| auto fix_column_type = [](doris::TabletSchemaCloudPB* schema) { |
| for (auto& column : *schema->mutable_column()) { |
| if (column.type() == "DECIMAL128") { |
| column.mutable_type()->push_back('I'); |
| } |
| } |
| }; |
| if (config::write_schema_kv) { |
| // detach TabletSchemaCloudPB from TabletMetaCloudPB |
| tablet_meta.set_schema_version(tablet_meta.schema().schema_version()); |
| auto [_, success] = saved_schema.emplace(index_id, tablet_meta.schema_version()); |
| if (success) { // schema may not be saved |
| fix_column_type(tablet_meta.mutable_schema()); |
| auto schema_key = |
| meta_schema_key({instance_id, index_id, tablet_meta.schema_version()}); |
| put_schema_kv(code, msg, txn.get(), schema_key, tablet_meta.schema()); |
| if (code != MetaServiceCode::OK) return; |
| if (is_versioned_write) { |
| auto versioned_schema_key = versioned::meta_schema_key( |
| {instance_id, index_id, tablet_meta.schema_version()}); |
| put_versioned_schema_kv(code, msg, txn.get(), versioned_schema_key, |
| tablet_meta.schema()); |
| if (code != MetaServiceCode::OK) return; |
| } |
| } |
| tablet_meta.set_allocated_schema(nullptr); |
| } else { |
| fix_column_type(tablet_meta.mutable_schema()); |
| } |
| } |
| |
| MetaTabletKeyInfo key_info {instance_id, table_id, index_id, partition_id, tablet_id}; |
| std::string key; |
| std::string val; |
| meta_tablet_key(key_info, &key); |
| |
| err = txn->get(key, &val); |
| TEST_SYNC_POINT_CALLBACK("meta_service_test:get_meta_tablet_key_error", &err); |
| if (err == TxnErrorCode::TXN_OK) { |
| doris::TabletMetaCloudPB exists_tablet_meta; |
| if (!exists_tablet_meta.ParseFromString(val)) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| msg = fmt::format("malformed tablet meta, unable to initialize, key={}", hex(key)); |
| return; |
| } |
| if (exists_tablet_meta.tablet_id() == tablet_meta.tablet_id() && |
| exists_tablet_meta.schema_version() == tablet_meta.schema_version()) { |
| // idempotent |
| code = MetaServiceCode::OK; |
| msg = fmt::format("tablet already exists, tablet_id={} schema_version={} key={}", |
| tablet_id, tablet_meta.schema_version(), hex(key)); |
| LOG(WARNING) << msg; |
| return; |
| } |
| } |
| if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| code = cast_as<ErrCategory::READ>(err); |
| msg = "failed to get tablet key, key=" + hex(key); |
| LOG(WARNING) << msg; |
| return; |
| } |
| if (!tablet_meta.SerializeToString(&val)) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| msg = "failed to serialize tablet meta"; |
| return; |
| } |
| txn->put(key, val); |
| LOG(INFO) << "xxx put tablet_key=" << hex(key) << " tablet id " << tablet_id; |
| if (is_versioned_write) { |
| std::string versioned_tablet_key = versioned::meta_tablet_key({instance_id, tablet_id}); |
| if (!versioned::document_put(txn.get(), versioned_tablet_key, std::move(tablet_meta))) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| msg = fmt::format("failed to serialize versioned tablet meta, key={}", |
| hex(versioned_tablet_key)); |
| return; |
| } |
| LOG(INFO) << "put versioned tablet meta, tablet_id=" << tablet_id |
| << " key=" << hex(versioned_tablet_key); |
| } |
| |
| // Index tablet_id -> table_id, index_id, partition_id |
| std::string key1; |
| std::string val1; |
| MetaTabletIdxKeyInfo key_info1 {instance_id, tablet_id}; |
| meta_tablet_idx_key(key_info1, &key1); |
| TabletIndexPB tablet_table; |
| if (request->has_db_id()) { |
| tablet_table.set_db_id(request->db_id()); |
| } |
| tablet_table.set_table_id(table_id); |
| tablet_table.set_index_id(index_id); |
| tablet_table.set_partition_id(partition_id); |
| tablet_table.set_tablet_id(tablet_id); |
| if (!tablet_table.SerializeToString(&val1)) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| msg = "failed to serialize tablet table value"; |
| return; |
| } |
| txn->put(key1, val1); |
| LOG(INFO) << "put tablet_idx tablet_id=" << tablet_id << " key=" << hex(key1); |
| if (is_versioned_write) { |
| if (!request->has_db_id()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "db_id is required for versioned write, please upgrade your FE version"; |
| return; |
| } |
| int64_t db_id = request->db_id(); |
| std::string tablet_idx_key = versioned::tablet_index_key({instance_id, tablet_id}); |
| std::string tablet_inverted_idx_key = versioned::tablet_inverted_index_key( |
| {instance_id, db_id, table_id, index_id, partition_id, tablet_id}); |
| txn->put(tablet_idx_key, val1); |
| txn->put(tablet_inverted_idx_key, ""); |
| LOG(INFO) << "put versioned tablet index, tablet_id=" << tablet_id |
| << " key=" << hex(tablet_idx_key) |
| << " inverted_key=" << hex(tablet_inverted_idx_key); |
| } |
| |
| // Create stats info for the tablet |
| auto stats_key = stats_tablet_key({instance_id, table_id, index_id, partition_id, tablet_id}); |
| std::string stats_val; |
| TabletStatsPB stats_pb; |
| stats_pb.set_num_rowsets(1); |
| stats_pb.set_num_segments(0); |
| if (request->has_db_id()) { |
| stats_pb.mutable_idx()->set_db_id(request->db_id()); |
| } |
| stats_pb.mutable_idx()->set_table_id(table_id); |
| stats_pb.mutable_idx()->set_index_id(index_id); |
| stats_pb.mutable_idx()->set_partition_id(partition_id); |
| stats_pb.mutable_idx()->set_tablet_id(tablet_id); |
| stats_pb.set_base_compaction_cnt(0); |
| stats_pb.set_cumulative_compaction_cnt(0); |
| // set cumulative point to 2 to not compact rowset [0-1] |
| stats_pb.set_cumulative_point(2); |
| stats_val = stats_pb.SerializeAsString(); |
| DCHECK(!stats_val.empty()); |
| txn->put(stats_key, stats_val); |
| LOG(INFO) << "put tablet stats, tablet_id=" << tablet_id << " table_id=" << table_id |
| << " index_id=" << index_id << " partition_id=" << partition_id |
| << " key=" << hex(stats_key); |
| if (is_versioned_write) { |
| std::string compact_stats_key = |
| versioned::tablet_compact_stats_key({instance_id, tablet_id}); |
| if (!versioned::document_put(txn.get(), compact_stats_key, std::move(stats_pb))) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| msg = fmt::format("failed to serialize versioned tablet compact stats, key={}", |
| hex(compact_stats_key)); |
| return; |
| } |
| |
| // The load stats is initialized with zero values |
| TabletStatsPB empty_stats_pb; |
| std::string load_stats_key = versioned::tablet_load_stats_key({instance_id, tablet_id}); |
| if (!versioned::document_put(txn.get(), load_stats_key, std::move(empty_stats_pb))) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| msg = fmt::format("failed to serialize versioned tablet stats, key={}", |
| hex(load_stats_key)); |
| return; |
| } |
| |
| LOG(INFO) << "put versioned tablet load and compact stats, tablet_id=" << tablet_id |
| << " load_stats_key=" << hex(load_stats_key) |
| << " compact_stats_key=" << hex(compact_stats_key); |
| } |
| |
| err = txn->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::COMMIT>(err); |
| msg = fmt::format("failed to save tablet meta, ret={}", err); |
| return; |
| } |
| } |
| |
| void MetaServiceImpl::create_tablets(::google::protobuf::RpcController* controller, |
| const CreateTabletsRequest* request, |
| CreateTabletsResponse* response, |
| ::google::protobuf::Closure* done) { |
| RPC_PREPROCESS(create_tablets, get, put); |
| |
| if (request->tablet_metas_size() == 0) { |
| msg = "no tablet meta"; |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| return; |
| } |
| instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id()); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "empty instance_id"; |
| LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id(); |
| return; |
| } |
| |
| RPC_RATE_LIMIT(create_tablets) |
| for (; request->has_storage_vault_name();) { |
| InstanceInfoPB instance; |
| std::unique_ptr<Transaction> txn0; |
| TxnErrorCode err = txn_kv_->create_txn(&txn0); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to create txn"); |
| return; |
| } |
| |
| InstanceKeyInfo key_info {instance_id}; |
| std::string key; |
| std::string val; |
| instance_key(key_info, &key); |
| |
| err = txn0->get(key, &val); |
| stats.get_bytes += val.size() + key.size(); |
| stats.get_counter++; |
| LOG(INFO) << "get instance_key=" << hex(key); |
| |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| ss << "failed to get instance, instance_id=" << instance_id << " err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| if (!instance.ParseFromString(val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = "failed to parse InstanceInfoPB"; |
| return; |
| } |
| |
| // This instance hasn't enable storage vault which means it's using legacy cloud mode |
| DCHECK(instance.enable_storage_vault()) |
| << "Only instances with enable_storage_vault true have vault name"; |
| |
| std::string_view name = request->storage_vault_name(); |
| |
| // Try to use the default vault name if user doesn't specify the vault name |
| // for correspoding table |
| if (name.empty()) { |
| if (!instance.has_default_storage_vault_name()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = fmt::format("You must supply at least one default vault"); |
| return; |
| } |
| name = instance.default_storage_vault_name(); |
| } |
| |
| // The S3 vault would be stored inside the instance.storage_vault_names and instance.resource_ids |
| auto vault_name = std::find_if( |
| instance.storage_vault_names().begin(), instance.storage_vault_names().end(), |
| [&](const auto& candidate_name) { return candidate_name == name; }); |
| if (vault_name != instance.storage_vault_names().end()) { |
| auto idx = vault_name - instance.storage_vault_names().begin(); |
| response->set_storage_vault_id(instance.resource_ids().at(idx)); |
| response->set_storage_vault_name(*vault_name); |
| break; |
| } |
| |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = fmt::format("failed to get vault id, vault name={}", name); |
| return; |
| } |
| |
| // [index_id, schema_version] |
| std::set<std::pair<int64_t, int32_t>> saved_schema; |
| TEST_SYNC_POINT_RETURN_WITH_VOID("create_tablets"); |
| bool is_versioned_write = is_version_write_enabled(instance_id); |
| for (auto& tablet_meta : request->tablet_metas()) { |
| internal_create_tablet(request, code, msg, tablet_meta, txn_kv_, instance_id, saved_schema, |
| stats, is_versioned_write); |
| if (code != MetaServiceCode::OK) { |
| return; |
| } |
| } |
| } |
| |
| void internal_get_tablet(MetaServiceCode& code, std::string& msg, const std::string& instance_id, |
| Transaction* txn, int64_t tablet_id, doris::TabletMetaCloudPB* tablet_meta, |
| bool skip_schema) { |
| // TODO: validate request |
| TabletIndexPB tablet_idx; |
| get_tablet_idx(code, msg, txn, instance_id, tablet_id, tablet_idx); |
| if (code != MetaServiceCode::OK) return; |
| |
| MetaTabletKeyInfo key_info1 {instance_id, tablet_idx.table_id(), tablet_idx.index_id(), |
| tablet_idx.partition_id(), tablet_id}; |
| std::string key1, val1; |
| meta_tablet_key(key_info1, &key1); |
| TxnErrorCode err = txn->get(key1, &val1); |
| if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| code = MetaServiceCode::TABLET_NOT_FOUND; |
| msg = "failed to get tablet, err=not found"; |
| return; |
| } else if (err != TxnErrorCode::TXN_OK) [[unlikely]] { |
| code = cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to get tablet, err={}", err); |
| return; |
| } |
| |
| if (!tablet_meta->ParseFromString(val1)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = "malformed tablet meta, unable to initialize"; |
| return; |
| } |
| |
| if (tablet_meta->has_schema() && |
| tablet_meta->schema().column_size() > 0) { // tablet meta saved before detach schema kv |
| tablet_meta->set_schema_version(tablet_meta->schema().schema_version()); |
| } |
| |
| if ((!tablet_meta->has_schema() || |
| (tablet_meta->has_schema() && tablet_meta->schema().column_size() <= 0)) && |
| !skip_schema) { |
| if (!tablet_meta->has_schema_version()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "tablet_meta must have either schema or schema_version"; |
| return; |
| } |
| auto key = meta_schema_key( |
| {instance_id, tablet_meta->index_id(), tablet_meta->schema_version()}); |
| ValueBuf val_buf; |
| err = cloud::blob_get(txn, key, &val_buf); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to get schema, err={}", err == TxnErrorCode::TXN_KEY_NOT_FOUND |
| ? "not found" |
| : "internal error"); |
| return; |
| } |
| if (!parse_schema_value(val_buf, tablet_meta->mutable_schema())) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = fmt::format("malformed schema value, key={}", key); |
| return; |
| } |
| } |
| } |
| |
| static bool try_fetch_and_parse_schema(Transaction* txn, TabletSchemaCloudPB* schema, |
| const std::string& instance_id, int64_t index_id, |
| int64_t schema_version, MetaServiceCode& code, |
| std::string& msg, bool is_versioned_read, |
| ResourceManager* resource_mgr) { |
| if (!is_versioned_read) { |
| std::string key = meta_schema_key({instance_id, index_id, schema_version}); |
| ValueBuf val_buf; |
| TxnErrorCode err = cloud::blob_get(txn, key, &val_buf); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to get schema, schema_version={}: {}", schema_version, err); |
| return false; |
| } |
| if (!parse_schema_value(val_buf, schema)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = fmt::format("malformed schema value, key={}", key); |
| return false; |
| } |
| } else { |
| CloneChainReader reader(instance_id, resource_mgr); |
| TxnErrorCode err = reader.get_tablet_schema(txn, index_id, schema_version, schema); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to get versioned tablet schema, err={}", err); |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| void MetaServiceImpl::update_tablet(::google::protobuf::RpcController* controller, |
| const UpdateTabletRequest* request, |
| UpdateTabletResponse* response, |
| ::google::protobuf::Closure* done) { |
| RPC_PREPROCESS(update_tablet, get, put); |
| instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id()); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "empty instance_id"; |
| LOG(WARNING) << msg << ", cloud_unique_id=" << request->cloud_unique_id(); |
| return; |
| } |
| RPC_RATE_LIMIT(update_tablet) |
| |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| msg = "failed to init txn"; |
| return; |
| } |
| UpdateTabletLogPB update_tablet_log; |
| bool is_versioned_write = is_version_write_enabled(instance_id); |
| bool is_versioned_read = is_version_read_enabled(instance_id); |
| CloneChainReader reader(instance_id, txn_kv_.get(), resource_mgr_.get()); |
| for (const TabletMetaInfoPB& tablet_meta_info : request->tablet_meta_infos()) { |
| doris::TabletMetaCloudPB tablet_meta; |
| if (!is_versioned_read) { |
| internal_get_tablet(code, msg, instance_id, txn.get(), tablet_meta_info.tablet_id(), |
| &tablet_meta, true); |
| if (code != MetaServiceCode::OK) { |
| return; |
| } |
| } else { |
| TxnErrorCode err = reader.get_tablet_meta(txn.get(), tablet_meta_info.tablet_id(), |
| &tablet_meta, nullptr); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to get versioned tablet meta, err={}", err); |
| return; |
| } |
| } |
| if (tablet_meta_info.has_is_in_memory()) { // deprecate after 3.0.0 |
| tablet_meta.set_is_in_memory(tablet_meta_info.is_in_memory()); |
| } else if (tablet_meta_info.has_is_persistent()) { // deprecate after 3.0.0 |
| tablet_meta.set_is_persistent(tablet_meta_info.is_persistent()); |
| } else if (tablet_meta_info.has_ttl_seconds()) { |
| tablet_meta.set_ttl_seconds(tablet_meta_info.ttl_seconds()); |
| } else if (tablet_meta_info.has_compaction_policy()) { |
| tablet_meta.set_compaction_policy(tablet_meta_info.compaction_policy()); |
| } else if (tablet_meta_info.has_time_series_compaction_goal_size_mbytes()) { |
| tablet_meta.set_time_series_compaction_goal_size_mbytes( |
| tablet_meta_info.time_series_compaction_goal_size_mbytes()); |
| } else if (tablet_meta_info.has_time_series_compaction_file_count_threshold()) { |
| tablet_meta.set_time_series_compaction_file_count_threshold( |
| tablet_meta_info.time_series_compaction_file_count_threshold()); |
| } else if (tablet_meta_info.has_time_series_compaction_time_threshold_seconds()) { |
| tablet_meta.set_time_series_compaction_time_threshold_seconds( |
| tablet_meta_info.time_series_compaction_time_threshold_seconds()); |
| } else if (tablet_meta_info.has_time_series_compaction_empty_rowsets_threshold()) { |
| tablet_meta.set_time_series_compaction_empty_rowsets_threshold( |
| tablet_meta_info.time_series_compaction_empty_rowsets_threshold()); |
| } else if (tablet_meta_info.has_time_series_compaction_level_threshold()) { |
| tablet_meta.set_time_series_compaction_level_threshold( |
| tablet_meta_info.time_series_compaction_level_threshold()); |
| } else if (tablet_meta_info.has_disable_auto_compaction()) { |
| if (tablet_meta.has_schema() && tablet_meta.schema().column_size() > 0) { |
| tablet_meta.mutable_schema()->set_disable_auto_compaction( |
| tablet_meta_info.disable_auto_compaction()); |
| } else { |
| doris::TabletSchemaCloudPB schema_pb; |
| if (!try_fetch_and_parse_schema(txn.get(), &schema_pb, instance_id, |
| tablet_meta.index_id(), |
| tablet_meta.schema_version(), code, msg, |
| is_versioned_read, resource_mgr_.get())) { |
| return; |
| } |
| |
| schema_pb.set_disable_auto_compaction(tablet_meta_info.disable_auto_compaction()); |
| auto key = meta_schema_key( |
| {instance_id, tablet_meta.index_id(), tablet_meta.schema_version()}); |
| put_schema_kv(code, msg, txn.get(), key, schema_pb); |
| if (code != MetaServiceCode::OK) return; |
| if (is_versioned_write) { |
| auto key = versioned::meta_schema_key( |
| {instance_id, tablet_meta.index_id(), tablet_meta.schema_version()}); |
| put_versioned_schema_kv(code, msg, txn.get(), key, schema_pb); |
| if (code != MetaServiceCode::OK) return; |
| } |
| } |
| } |
| int64_t table_id = tablet_meta.table_id(); |
| int64_t index_id = tablet_meta.index_id(); |
| int64_t partition_id = tablet_meta.partition_id(); |
| int64_t tablet_id = tablet_meta.tablet_id(); |
| |
| MetaTabletKeyInfo key_info {instance_id, table_id, index_id, partition_id, tablet_id}; |
| std::string key; |
| std::string val; |
| meta_tablet_key(key_info, &key); |
| if (!tablet_meta.SerializeToString(&val)) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| msg = "failed to serialize tablet meta"; |
| return; |
| } |
| txn->put(key, val); |
| LOG(INFO) << "xxx put tablet_key=" << hex(key); |
| |
| if (is_versioned_write) { |
| update_tablet_log.add_tablet_ids(tablet_id); |
| |
| std::string tablet_meta_key = versioned::meta_tablet_key({instance_id, tablet_id}); |
| if (!versioned::document_put(txn.get(), tablet_meta_key, std::move(tablet_meta))) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| msg = fmt::format( |
| "failed to serialize tablet meta for versioned write. tablet_id={}", |
| tablet_id); |
| return; |
| } |
| LOG(INFO) << "put versioned tablet meta, tablet_id=" << tablet_id |
| << " key=" << hex(tablet_meta_key); |
| } |
| } |
| |
| if (is_versioned_write && update_tablet_log.tablet_ids_size() > 0) { |
| OperationLogPB log; |
| log.mutable_update_tablet()->Swap(&update_tablet_log); |
| std::string log_key = versioned::log_key(instance_id); |
| versioned::blob_put(txn.get(), log_key, log); |
| LOG(INFO) << "put update tablet operation log, key=" << hex(log_key) |
| << " instance_id=" << instance_id; |
| } |
| |
| err = txn->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::COMMIT>(err); |
| ss << "failed to update tablet meta, err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| } |
| |
| void MetaServiceImpl::get_tablet(::google::protobuf::RpcController* controller, |
| const GetTabletRequest* request, GetTabletResponse* response, |
| ::google::protobuf::Closure* done) { |
| RPC_PREPROCESS(get_tablet, get); |
| instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id()); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "empty instance_id"; |
| LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id(); |
| return; |
| } |
| RPC_RATE_LIMIT(get_tablet) |
| |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| msg = "failed to init txn"; |
| return; |
| } |
| if (!is_version_read_enabled(instance_id)) { |
| internal_get_tablet(code, msg, instance_id, txn.get(), request->tablet_id(), |
| response->mutable_tablet_meta(), false); |
| return; |
| } |
| |
| CloneChainReader reader(instance_id, resource_mgr_.get()); |
| TabletMetaCloudPB tablet_meta; |
| err = reader.get_tablet_meta(txn.get(), request->tablet_id(), &tablet_meta, nullptr); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to get tablet meta, tablet_id={}, err={}", request->tablet_id(), |
| err); |
| return; |
| } |
| |
| if (tablet_meta.has_schema() && |
| tablet_meta.schema().column_size() > 0) { // tablet meta saved before detach schema kv |
| tablet_meta.set_schema_version(tablet_meta.schema().schema_version()); |
| return; |
| } |
| |
| if (!tablet_meta.has_schema_version()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "tablet_meta must have either schema or schema_version"; |
| response->mutable_tablet_meta()->Clear(); |
| return; |
| } |
| err = reader.get_tablet_schema(txn.get(), tablet_meta.index_id(), tablet_meta.schema_version(), |
| tablet_meta.mutable_schema()); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to get tablet schema, tablet_id={}, err={}", request->tablet_id(), |
| err); |
| return; |
| } |
| response->mutable_tablet_meta()->Swap(&tablet_meta); |
| } |
| |
| static void set_schema_in_existed_rowset(MetaServiceCode& code, std::string& msg, Transaction* txn, |
| const std::string& instance_id, |
| doris::RowsetMetaCloudPB& rowset_meta, |
| doris::RowsetMetaCloudPB& existed_rowset_meta, |
| bool is_versioned_read, ResourceManager* resource_mgr) { |
| DCHECK(existed_rowset_meta.has_index_id()); |
| if (!existed_rowset_meta.has_schema_version()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "rowset_meta must have either schema or schema_version"; |
| return; |
| } |
| // Currently, schema version of `existed_rowset_meta` and `rowset_meta` MUST be equal |
| DCHECK_EQ(existed_rowset_meta.schema_version(), |
| rowset_meta.has_tablet_schema() ? rowset_meta.tablet_schema().schema_version() |
| : rowset_meta.schema_version()); |
| if (rowset_meta.has_tablet_schema() && |
| rowset_meta.tablet_schema().schema_version() == existed_rowset_meta.schema_version()) { |
| if (existed_rowset_meta.GetArena() && |
| rowset_meta.tablet_schema().GetArena() == existed_rowset_meta.GetArena()) { |
| existed_rowset_meta.unsafe_arena_set_allocated_tablet_schema( |
| rowset_meta.mutable_tablet_schema()); |
| } else { |
| existed_rowset_meta.mutable_tablet_schema()->CopyFrom(rowset_meta.tablet_schema()); |
| } |
| } else { |
| if (!try_fetch_and_parse_schema(txn, existed_rowset_meta.mutable_tablet_schema(), |
| instance_id, existed_rowset_meta.index_id(), |
| existed_rowset_meta.schema_version(), code, msg, |
| is_versioned_read, resource_mgr)) { |
| return; |
| } |
| } |
| } |
| |
| void scan_restore_job_rowset( |
| Transaction* txn, const std::string& instance_id, int64_t tablet_id, MetaServiceCode& code, |
| std::string& msg, |
| std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>* restore_job_rs_metas) { |
| std::stringstream ss; |
| JobRestoreRowsetKeyInfo rs_key_info0 {instance_id, tablet_id, 0}; |
| JobRestoreRowsetKeyInfo rs_key_info1 {instance_id, tablet_id + 1, 0}; |
| std::string restore_job_rs_key0; |
| std::string restore_job_rs_key1; |
| job_restore_rowset_key(rs_key_info0, &restore_job_rs_key0); |
| job_restore_rowset_key(rs_key_info1, &restore_job_rs_key1); |
| |
| int num_rowsets = 0; |
| std::unique_ptr<int, std::function<void(int*)>> defer_log_range( |
| (int*)0x01, [restore_job_rs_key0, restore_job_rs_key1, &num_rowsets](int*) { |
| LOG(INFO) << "get restore job rs meta, num_rowsets=" << num_rowsets << " range=[" |
| << hex(restore_job_rs_key0) << "," << hex(restore_job_rs_key1) << "]"; |
| }); |
| |
| std::unique_ptr<RangeGetIterator> it; |
| do { |
| TxnErrorCode err = txn->get(restore_job_rs_key0, restore_job_rs_key1, &it, true); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| ss << "failed to get restore job rs meta while committing," |
| << " tablet_id=" << tablet_id << " err=" << err; |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| while (it->has_next()) { |
| auto [k, v] = it->next(); |
| LOG(INFO) << "range_get restore_job_rs_key=" << hex(k) << " tablet_id=" << tablet_id; |
| restore_job_rs_metas->emplace_back(); |
| if (!restore_job_rs_metas->back().second.ParseFromArray(v.data(), v.size())) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| ss << "malformed restore job rowset meta, tablet_id=" << tablet_id |
| << " key=" << hex(k); |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| restore_job_rs_metas->back().first = std::string(k.data(), k.size()); |
| ++num_rowsets; |
| if (!it->has_next()) restore_job_rs_key0 = k; |
| } |
| restore_job_rs_key0.push_back('\x00'); // Update to next smallest key for iteration |
| } while (it->more()); |
| return; |
| } |
| |
| /* |
| * Restore job state: |
| * +--------------+ |
| * | PREPARED | |
| * +--------------+ |
| * | | |
| * +-----------+ +-----------+ |
| * | commit | |
| * V | |
| * +--------------+ abort/expired | abort/expired |
| * | COMMITTED | ---------------+ | |
| * +--------------+ | | |
| * | | | |
| * | complete | | |
| * V V V |
| * +--------------+ +--------------+ |
| * | COMPLETED | | DROPPED | |
| * +--------------+ +--------------+ |
| * | | |
| * +----------+ +-----------+ |
| * recycle kv | | recycle kv & data |
| * V V |
| * +--------------+ |
| * | RECYCLING | |
| * +--------------+ |
| */ |
| void MetaServiceImpl::prepare_restore_job(::google::protobuf::RpcController* controller, |
| const RestoreJobRequest* request, |
| RestoreJobResponse* response, |
| ::google::protobuf::Closure* done) { |
| RPC_PREPROCESS(prepare_restore_job); |
| if (request->action() != RestoreJobRequest::PREPARE) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "invalid action, expected PREPARE but got " + |
| RestoreJobRequest::Action_Name(request->action()); |
| return; |
| } |
| |
| if (!request->has_tablet_id()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "empty tablet_id"; |
| return; |
| } |
| |
| if (!request->has_tablet_meta() || !request->tablet_meta().rs_metas_size()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = !request->has_tablet_meta() ? "no tablet meta" : "no rowset meta"; |
| return; |
| } |
| |
| if (!request->tablet_meta().has_schema() && !request->tablet_meta().has_schema_version()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "tablet_meta must have either schema or schema_version"; |
| return; |
| } |
| |
| instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id()); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "empty instance_id"; |
| LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id(); |
| return; |
| } |
| |
| RPC_RATE_LIMIT(prepare_restore_job) |
| |
| std::unique_ptr<Transaction> txn0; |
| TxnErrorCode err = txn_kv_->create_txn(&txn0); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| msg = "failed to init txn"; |
| return; |
| } |
| |
| // validate request |
| TabletIndexPB tablet_idx; |
| get_tablet_idx(code, msg, txn0.get(), instance_id, request->tablet_id(), tablet_idx); |
| if (code != MetaServiceCode::OK) { |
| return; |
| } |
| |
| auto key = job_restore_tablet_key({instance_id, tablet_idx.tablet_id()}); |
| std::string val; |
| err = txn0->get(key, &val); |
| if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| code = cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to check restore tablet {} existence, err={}", |
| tablet_idx.tablet_id(), err); |
| return; |
| } |
| |
| int64_t version = 0; |
| if (err == TxnErrorCode::TXN_OK) { |
| RestoreJobCloudPB restore_job_pb; |
| if (!restore_job_pb.ParseFromString(val)) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "malformed restore job"; |
| LOG_WARNING(msg); |
| return; |
| } |
| if (restore_job_pb.state() == RestoreJobCloudPB::RECYCLING) { |
| // request may arrive when recycle start |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = fmt::format("restore tablet {} is recycling, state: {}", tablet_idx.tablet_id(), |
| RestoreJobCloudPB::State_Name(restore_job_pb.state())); |
| return; |
| } else if (restore_job_pb.state() != RestoreJobCloudPB::PREPARED) { |
| // COMMITTED/DROPPED/COMPLETED state, should not happen |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = fmt::format("restore tablet {} already exists, state: {}", tablet_idx.tablet_id(), |
| RestoreJobCloudPB::State_Name(restore_job_pb.state())); |
| return; |
| } |
| version = restore_job_pb.version() + 1; |
| } |
| |
| TabletMetaCloudPB tablet_meta; |
| std::vector<doris::RowsetMetaCloudPB> rs_metas; |
| tablet_meta = request->tablet_meta(); |
| rs_metas.assign(tablet_meta.rs_metas().begin(), tablet_meta.rs_metas().end()); |
| tablet_meta.clear_rs_metas(); // strip off rs meta |
| |
| int64_t total_rowset_num = 0; |
| int64_t total_row_num = 0; |
| int64_t total_segment_num = 0; |
| int64_t total_disk_size = 0; |
| // 1. save restore job |
| RestoreJobCloudPB pb; |
| std::string to_save_val; |
| { |
| pb.set_tablet_id(tablet_idx.tablet_id()); |
| pb.mutable_tablet_meta()->Swap(&tablet_meta); |
| pb.set_ctime_s(::time(nullptr)); |
| pb.set_expired_at_s(request->expiration()); |
| pb.set_state(RestoreJobCloudPB::PREPARED); |
| total_rowset_num = rs_metas.size(); |
| for (const auto& rs_meta : rs_metas) { |
| total_row_num += rs_meta.num_rows(); |
| total_segment_num += rs_meta.num_segments(); |
| total_disk_size += rs_meta.total_disk_size(); |
| } |
| pb.set_total_rowset_num(total_rowset_num); |
| pb.set_total_row_num(total_row_num); |
| pb.set_total_segment_num(total_segment_num); |
| pb.set_total_disk_size(total_disk_size); |
| pb.set_committed_rowset_num(0); |
| pb.set_version(version); |
| pb.SerializeToString(&to_save_val); |
| } |
| LOG_INFO("prepare restore job") |
| .tag("job_restore_tablet_key", hex(key)) |
| .tag("tablet_id", tablet_idx.tablet_id()) |
| .tag("state", RestoreJobCloudPB::PREPARED) |
| .tag("total_rowset_num", total_rowset_num) |
| .tag("total_row_num", total_row_num) |
| .tag("total_segment_num", total_segment_num) |
| .tag("total_disk_size", total_disk_size) |
| .tag("version", version); |
| txn0->put(key, to_save_val); |
| err = txn0->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::COMMIT>(err); |
| msg = fmt::format("failed to commit txn: {}", err); |
| return; |
| } |
| |
| // 2. save restore rowsets |
| int64_t saved_rowset_num = 0; |
| int32_t max_batch_size = config::max_restore_job_rowsets_per_batch; |
| for (size_t i = 0; i < rs_metas.size(); i += max_batch_size) { |
| size_t end = (i + max_batch_size) > rs_metas.size() ? rs_metas.size() : i + max_batch_size; |
| std::vector<doris::RowsetMetaCloudPB> sub_restore_job_rs_metas(rs_metas.begin() + i, |
| rs_metas.begin() + end); |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| msg = "failed to init txn"; |
| return; |
| } |
| for (auto& rowset_meta : sub_restore_job_rs_metas) { |
| // put restore rowset kv |
| std::string restore_job_rs_key; |
| std::string restore_job_rs_val; |
| JobRestoreRowsetKeyInfo rs_key_info {instance_id, tablet_idx.tablet_id(), |
| rowset_meta.end_version()}; |
| job_restore_rowset_key(rs_key_info, &restore_job_rs_key); |
| if (!rowset_meta.SerializeToString(&restore_job_rs_val)) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| msg = "failed to serialize rowset meta"; |
| return; |
| } |
| txn->put(restore_job_rs_key, restore_job_rs_val); |
| LOG_INFO("put restore job rowset") |
| .tag("restore_job_rs_key", hex(restore_job_rs_key)) |
| .tag("tablet_id", tablet_idx.tablet_id()) |
| .tag("rowset_id", rowset_meta.rowset_id_v2()) |
| .tag("rowset_size", restore_job_rs_key.size() + restore_job_rs_val.size()) |
| .tag("rowset_meta", rowset_meta.DebugString()); |
| } |
| err = txn->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::COMMIT>(err); |
| ss << "failed to prepare restore job," |
| << " tablet_id=" << tablet_idx.tablet_id() |
| << " saved_rowset_num=" << saved_rowset_num |
| << " total_rowset_num=" << total_rowset_num << " err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| saved_rowset_num += sub_restore_job_rs_metas.size(); |
| } |
| } |
| |
| void MetaServiceImpl::commit_restore_job(::google::protobuf::RpcController* controller, |
| const RestoreJobRequest* request, |
| RestoreJobResponse* response, |
| ::google::protobuf::Closure* done) { |
| RPC_PREPROCESS(commit_restore_job); |
| if (request->action() != RestoreJobRequest::COMMIT) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "invalid action, expected COMMIT but got " + |
| RestoreJobRequest::Action_Name(request->action()); |
| return; |
| } |
| |
| if (!request->has_tablet_id()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "empty tablet_id"; |
| return; |
| } |
| |
| instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id()); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "empty instance_id"; |
| LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id(); |
| return; |
| } |
| |
| RPC_RATE_LIMIT(commit_restore_job) |
| |
| std::unique_ptr<Transaction> txn0; |
| TxnErrorCode err = txn_kv_->create_txn(&txn0); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| msg = "failed to init txn"; |
| return; |
| } |
| |
| TabletIndexPB tablet_idx; |
| get_tablet_idx(code, msg, txn0.get(), instance_id, request->tablet_id(), tablet_idx); |
| if (code != MetaServiceCode::OK) { |
| return; |
| } |
| |
| // 1. get restore job |
| auto key = job_restore_tablet_key({instance_id, request->tablet_id()}); |
| std::string val; |
| err = txn0->get(key, &val); |
| if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "restore job not exists or has been recycled"; |
| return; |
| } |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to check restore job existence, err={}", err); |
| LOG_WARNING(msg); |
| return; |
| } |
| |
| RestoreJobCloudPB restore_job_pb; |
| if (!restore_job_pb.ParseFromString(val)) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "malformed restore job"; |
| LOG_WARNING(msg); |
| return; |
| } |
| |
| if (restore_job_pb.state() == RestoreJobCloudPB::COMMITTED) { |
| // duplicate request, previous request succeed, return ok |
| return; |
| } else if (restore_job_pb.state() == RestoreJobCloudPB::RECYCLING) { |
| // RECYCLING, request may arrive when recycle start |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = fmt::format("restore tablet {} is recycling, state: {}", tablet_idx.tablet_id(), |
| RestoreJobCloudPB::State_Name(restore_job_pb.state())); |
| return; |
| } else if (restore_job_pb.state() != RestoreJobCloudPB::PREPARED) { |
| // Only allow PREPARED -> COMMITTED |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = fmt::format("restore tablet {} with invalid state: {}", tablet_idx.tablet_id(), |
| RestoreJobCloudPB::State_Name(restore_job_pb.state())); |
| return; |
| } |
| |
| // 2. get restore rowsets |
| std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>> restore_job_rs_metas; |
| scan_restore_job_rowset(txn0.get(), instance_id, request->tablet_id(), code, msg, |
| &restore_job_rs_metas); |
| if (code != MetaServiceCode::OK) { |
| LOG_WARNING("scan restore job rowset failed") |
| .tag("tablet_id", request->tablet_id()) |
| .tag("msg", msg) |
| .tag("code", code); |
| return; |
| } |
| |
| if (restore_job_pb.total_rowset_num() != restore_job_rs_metas.size()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = fmt::format("rowset num mismatch, total_rowset_num={}, rs_metas_size={}", |
| restore_job_pb.total_rowset_num(), restore_job_rs_metas.size()); |
| return; |
| } |
| |
| bool is_versioned_write = is_version_write_enabled(instance_id); |
| // 3. convert restore job rowsets to rs meta |
| TabletStats tablet_stat; |
| int64_t converted_rowset_num = 0; |
| int32_t max_batch_size = config::max_restore_job_rowsets_per_batch; |
| for (size_t i = 0; i < restore_job_rs_metas.size(); i += max_batch_size) { |
| size_t end = (i + max_batch_size) > restore_job_rs_metas.size() |
| ? restore_job_rs_metas.size() |
| : i + max_batch_size; |
| std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>> sub_restore_job_rs_metas( |
| restore_job_rs_metas.begin() + i, restore_job_rs_metas.begin() + end); |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| msg = "failed to init txn"; |
| return; |
| } |
| |
| // 3.1 put rs metas |
| for (auto& [_, rowset_meta] : sub_restore_job_rs_metas) { |
| if (config::write_schema_kv && rowset_meta.has_tablet_schema()) { |
| rowset_meta.set_index_id(tablet_idx.index_id()); |
| rowset_meta.set_schema_version(rowset_meta.tablet_schema().schema_version()); |
| std::string schema_key = meta_schema_key( |
| {instance_id, rowset_meta.index_id(), rowset_meta.schema_version()}); |
| if (rowset_meta.has_variant_type_in_schema()) { |
| write_schema_dict(code, msg, instance_id, txn.get(), &rowset_meta); |
| if (code != MetaServiceCode::OK) { |
| return; |
| } |
| } |
| put_schema_kv(code, msg, txn.get(), schema_key, rowset_meta.tablet_schema()); |
| if (code != MetaServiceCode::OK) { |
| return; |
| } |
| if (is_versioned_write) { |
| std::string versioned_schema_key = versioned::meta_schema_key( |
| {instance_id, rowset_meta.index_id(), rowset_meta.schema_version()}); |
| put_versioned_schema_kv(code, msg, txn.get(), versioned_schema_key, |
| rowset_meta.tablet_schema()); |
| if (code != MetaServiceCode::OK) return; |
| } |
| rowset_meta.set_allocated_tablet_schema(nullptr); |
| } |
| // put rowset meta |
| std::string rs_key; |
| std::string rs_val; |
| MetaRowsetKeyInfo rs_key_info {instance_id, tablet_idx.tablet_id(), |
| rowset_meta.end_version()}; |
| meta_rowset_key(rs_key_info, &rs_key); |
| if (!rowset_meta.SerializeToString(&rs_val)) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| msg = "failed to serialize rowset meta"; |
| return; |
| } |
| txn->put(rs_key, rs_val); |
| LOG_INFO("put rowset key") |
| .tag("rowset_key", hex(rs_key)) |
| .tag("tablet_id", tablet_idx.tablet_id()) |
| .tag("rowset_id", rowset_meta.rowset_id_v2()) |
| .tag("version", rowset_meta.end_version()) |
| .tag("rowset_size", rs_key.size() + rs_val.size()) |
| .tag("rowset_meta", rowset_meta.DebugString()); |
| |
| if (is_versioned_write) { |
| std::string meta_rowset_load_key = versioned::meta_rowset_load_key( |
| {instance_id, tablet_idx.tablet_id(), rowset_meta.end_version()}); |
| // Put versioned rowset compact metadata for new tablet's rowsets |
| LOG_INFO("put restore job's versioned meta rowset load key") |
| .tag("instance_id", instance_id) |
| .tag("meta_rowset_load_key", hex(meta_rowset_load_key)) |
| .tag("tablet_id", tablet_idx.tablet_id()); |
| // if will revert in drop_partition rpc if commit restore_job failed |
| if (!versioned::document_put(txn.get(), meta_rowset_load_key, |
| std::move(rowset_meta))) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| msg = fmt::format("failed to serialize versioned rowset meta, key={}", |
| hex(meta_rowset_load_key)); |
| return; |
| } |
| } |
| |
| tablet_stat.data_size += rowset_meta.total_disk_size(); |
| tablet_stat.num_rows += rowset_meta.num_rows(); |
| tablet_stat.num_segs += rowset_meta.num_segments(); |
| tablet_stat.index_size += rowset_meta.index_disk_size(); |
| tablet_stat.segment_size += rowset_meta.data_disk_size(); |
| ++tablet_stat.num_rowsets; |
| } |
| |
| err = txn->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::COMMIT>(err); |
| ss << "failed to commit restore job," |
| << " tablet_id=" << tablet_idx.tablet_id() |
| << " converted_rowset_num=" << converted_rowset_num << " err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| converted_rowset_num += sub_restore_job_rs_metas.size(); |
| } |
| |
| if (restore_job_pb.total_row_num() != tablet_stat.num_rows || |
| restore_job_pb.total_segment_num() != tablet_stat.num_segs || |
| restore_job_pb.total_disk_size() != tablet_stat.data_size) { |
| LOG_WARNING("Restore job verification failed:") |
| .tag("tablet_id", tablet_idx.tablet_id()) |
| .tag("expected_row", restore_job_pb.total_row_num()) |
| .tag("actual_row", tablet_stat.num_rows) |
| .tag("expected_segment", restore_job_pb.total_segment_num()) |
| .tag("actual_segment", tablet_stat.num_segs) |
| .tag("expected_data_size", restore_job_pb.total_disk_size()) |
| .tag("actual_data_size", tablet_stat.data_size); |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = fmt::format("commit restore job verification failed, tablet_id={}", |
| tablet_idx.tablet_id()); |
| return; |
| } |
| |
| // 4. convert restore tablet to tablet meta |
| TabletMetaCloudPB* tablet_meta = restore_job_pb.mutable_tablet_meta(); |
| DCHECK_EQ(tablet_meta->tablet_id(), tablet_idx.tablet_id()); |
| |
| DeleteBitmapPB* delete_bitmap = tablet_meta->mutable_delete_bitmap(); |
| auto store_version = request->has_store_version() ? request->store_version() : 1; |
| bool write_v1 = store_version == 1 || store_version == 3; |
| bool write_v2 = store_version == 2 || store_version == 3; |
| if (write_v1 && delete_bitmap->rowset_ids_size() > 0) { |
| txn0.reset(); |
| err = txn_kv_->create_txn(&txn0); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| msg = fmt::format("failed to create txn, err={}", err); |
| LOG(WARNING) << msg; |
| return; |
| } |
| |
| for (size_t i = 0; i < delete_bitmap->rowset_ids_size(); ++i) { |
| MetaDeleteBitmapInfo key_info {instance_id, tablet_meta->tablet_id(), |
| delete_bitmap->rowset_ids(i), delete_bitmap->versions(i), |
| delete_bitmap->segment_ids(i)}; |
| std::string key; |
| meta_delete_bitmap_key(key_info, &key); |
| auto& val = delete_bitmap->segment_delete_bitmaps(i); |
| |
| if (txn0->approximate_bytes() + key.size() * 3 + val.size() > |
| config::max_txn_commit_byte) { |
| err = txn0->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::COMMIT>(err); |
| msg = fmt::format("failed to update delete bitmap, tablet_id={}, err={}", |
| tablet_idx.tablet_id(), err); |
| return; |
| } |
| err = txn_kv_->create_txn(&txn0); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| msg = "failed to init txn"; |
| return; |
| } |
| } |
| txn0->put(key, val); |
| LOG_INFO("put delete bitmap key") |
| .tag("delete_bitmap_key", hex(key)) |
| .tag("tablet_id", tablet_idx.tablet_id()) |
| .tag("rowset_id", delete_bitmap->rowset_ids(i)) |
| .tag("version", delete_bitmap->versions(i)) |
| .tag("segment_id", delete_bitmap->segment_ids(i)) |
| .tag("delete_bitmap_size", key.size() + val.size()); |
| } |
| err = txn0->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::COMMIT>(err); |
| msg = fmt::format("failed to update delete bitmap, tablet_id={}, err={}", |
| tablet_idx.tablet_id(), err); |
| return; |
| } |
| } |
| |
| if (write_v2 && delete_bitmap->rowset_ids_size() > 0) { |
| txn0.reset(); |
| err = txn_kv_->create_txn(&txn0); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| msg = fmt::format("failed to create txn, err={}", err); |
| LOG(WARNING) << msg; |
| return; |
| } |
| |
| std::string pre_rowset_id = ""; |
| std::string cur_rowset_id = ""; |
| DeleteBitmapPB delete_bitmap_pb; |
| |
| auto store_delete_bitmap = [&](std::string rowset_id) { |
| DCHECK_NE(delete_bitmap_pb.rowset_ids_size(), 0); |
| std::string key; |
| versioned::MetaDeleteBitmapInfo key_info {instance_id, tablet_meta->tablet_id(), |
| rowset_id}; |
| versioned::meta_delete_bitmap_key(key_info, &key); |
| std::string val; |
| DeleteBitmapStoragePB delete_bitmap_storage; |
| delete_bitmap_storage.set_store_in_fdb(true); |
| *(delete_bitmap_storage.mutable_delete_bitmap()) = std::move(delete_bitmap_pb); |
| if (!delete_bitmap_storage.SerializeToString(&val)) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| msg = "failed to serialize delete bitmap storage"; |
| return; |
| } |
| |
| if (txn0->approximate_bytes() + key.size() * 3 + val.size() > |
| config::max_txn_commit_byte) { |
| cloud::blob_put(txn0.get(), key, val, 0); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::COMMIT>(err); |
| msg = fmt::format( |
| "failed to update versioned delete bitmap, tablet_id={}, err={}", |
| tablet_idx.tablet_id(), err); |
| return; |
| } |
| err = txn_kv_->create_txn(&txn0); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| msg = "failed to init txn"; |
| return; |
| } |
| } |
| cloud::blob_put(txn0.get(), key, val, 0); |
| LOG_INFO("put versioned delete bitmap key") |
| .tag("delete_bitmap_key", hex(key)) |
| .tag("tablet_id", tablet_idx.tablet_id()) |
| .tag("rowset_id", rowset_id) |
| .tag("delete_bitmap_num", |
| delete_bitmap_storage.delete_bitmap().rowset_ids_size()) |
| .tag("delete_bitmap_size", key.size() + val.size()); |
| }; |
| |
| for (size_t i = 0; i < delete_bitmap->rowset_ids_size(); ++i) { |
| cur_rowset_id = delete_bitmap->rowset_ids(i); |
| if (cur_rowset_id != pre_rowset_id) { |
| if (!pre_rowset_id.empty()) { |
| store_delete_bitmap(pre_rowset_id); |
| if (code != MetaServiceCode::OK) return; |
| } |
| pre_rowset_id = cur_rowset_id; |
| DCHECK_EQ(delete_bitmap_pb.rowset_ids_size(), 0); |
| DCHECK_EQ(delete_bitmap_pb.segment_ids_size(), 0); |
| DCHECK_EQ(delete_bitmap_pb.versions_size(), 0); |
| DCHECK_EQ(delete_bitmap_pb.segment_delete_bitmaps_size(), 0); |
| } |
| delete_bitmap_pb.add_rowset_ids(delete_bitmap->rowset_ids(i)); |
| delete_bitmap_pb.add_segment_ids(delete_bitmap->segment_ids(i)); |
| delete_bitmap_pb.add_versions(delete_bitmap->versions(i)); |
| delete_bitmap_pb.add_segment_delete_bitmaps(delete_bitmap->segment_delete_bitmaps(i)); |
| } |
| if (delete_bitmap_pb.rowset_ids_size() > 0) { |
| DCHECK(!cur_rowset_id.empty()); |
| store_delete_bitmap(cur_rowset_id); |
| if (code != MetaServiceCode::OK) return; |
| } |
| err = txn0->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::COMMIT>(err); |
| msg = fmt::format("failed to update versioned delete bitmap, tablet_id={}, err={}", |
| tablet_idx.tablet_id(), err); |
| return; |
| } |
| } |
| |
| txn0.reset(); |
| err = txn_kv_->create_txn(&txn0); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| msg = fmt::format("failed to create txn, err={}", err); |
| LOG(WARNING) << msg; |
| return; |
| } |
| |
| if (tablet_meta->has_schema()) { |
| // Temporary hard code to fix wrong column type string generated by FE |
| auto fix_column_type = [](doris::TabletSchemaCloudPB* schema) { |
| for (auto& column : *schema->mutable_column()) { |
| if (column.type() == "DECIMAL128") { |
| column.mutable_type()->push_back('I'); |
| } |
| } |
| }; |
| if (config::write_schema_kv) { |
| // detach TabletSchemaCloudPB from TabletMetaCloudPB |
| tablet_meta->set_schema_version(tablet_meta->schema().schema_version()); |
| fix_column_type(tablet_meta->mutable_schema()); |
| auto schema_key = meta_schema_key( |
| {instance_id, tablet_meta->index_id(), tablet_meta->schema_version()}); |
| put_schema_kv(code, msg, txn0.get(), schema_key, tablet_meta->schema()); |
| if (code != MetaServiceCode::OK) return; |
| |
| bool is_versioned_write = is_version_write_enabled(instance_id); |
| if (is_versioned_write) { |
| auto versioned_schema_key = versioned::meta_schema_key( |
| {instance_id, tablet_meta->index_id(), tablet_meta->schema_version()}); |
| put_versioned_schema_kv(code, msg, txn0.get(), versioned_schema_key, |
| tablet_meta->schema()); |
| if (code != MetaServiceCode::OK) return; |
| } |
| tablet_meta->set_allocated_schema(nullptr); |
| } else { |
| fix_column_type(tablet_meta->mutable_schema()); |
| } |
| } |
| |
| MetaTabletKeyInfo tablet_info {instance_id, tablet_meta->table_id(), tablet_meta->index_id(), |
| tablet_meta->partition_id(), tablet_meta->tablet_id()}; |
| std::string tablet_key; |
| std::string tablet_val; |
| meta_tablet_key(tablet_info, &tablet_key); |
| if (!tablet_meta->SerializeToString(&tablet_val)) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| msg = "failed to serialize tablet meta"; |
| return; |
| } |
| txn0->put(tablet_key, tablet_val); |
| LOG_INFO("put tablet key") |
| .tag("tablet_key", hex(tablet_key)) |
| .tag("tablet_id", tablet_idx.tablet_id()) |
| .tag("tablet_size", tablet_key.size() + tablet_val.size()); |
| |
| if (is_versioned_write) { |
| std::string versioned_tablet_key = |
| versioned::meta_tablet_key({instance_id, tablet_meta->tablet_id()}); |
| TabletMetaCloudPB meta; |
| meta.CopyFrom(*tablet_meta); |
| if (!versioned::document_put(txn0.get(), versioned_tablet_key, std::move(meta))) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| msg = fmt::format("failed to serialize versioned tablet meta, key={}", |
| hex(versioned_tablet_key)); |
| return; |
| } |
| LOG(INFO) << "put versioned tablet meta, tablet_id=" << tablet_meta->tablet_id() |
| << " key=" << hex(versioned_tablet_key); |
| } |
| |
| StatsTabletKeyInfo stat_info {instance_id, tablet_meta->table_id(), tablet_meta->index_id(), |
| tablet_meta->partition_id(), tablet_meta->tablet_id()}; |
| std::string stats_key; |
| std::string stats_val; |
| stats_tablet_key(stat_info, &stats_key); |
| TabletStatsPB stats_pb; |
| stats_pb.mutable_idx()->set_table_id(tablet_meta->table_id()); |
| stats_pb.mutable_idx()->set_index_id(tablet_meta->index_id()); |
| stats_pb.mutable_idx()->set_partition_id(tablet_meta->partition_id()); |
| stats_pb.mutable_idx()->set_tablet_id(tablet_meta->tablet_id()); |
| stats_pb.set_base_compaction_cnt(0); |
| stats_pb.set_cumulative_compaction_cnt(0); |
| stats_pb.set_cumulative_point(tablet_meta->cumulative_layer_point()); |
| stats_val = stats_pb.SerializeAsString(); |
| txn0->put(stats_key, stats_val); |
| LOG_INFO("put tablet stats") |
| .tag("tablet_id", tablet_meta->tablet_id()) |
| .tag("stats key", hex(stats_key)); |
| |
| if (is_versioned_write) { |
| TabletStatsPB tablet_load_stats; |
| auto tablet_load_stats_val = tablet_load_stats.SerializeAsString(); |
| std::string tablet_load_stats_version_key = |
| versioned::tablet_load_stats_key({instance_id, tablet_meta->tablet_id()}); |
| LOG_INFO("put versioned tablet load stats key") |
| .tag("tablet_load_stats_version_key", hex(tablet_load_stats_version_key)) |
| .tag("tablet_id", tablet_meta->tablet_id()) |
| .tag("value_size", tablet_load_stats_val.size()) |
| .tag("instance_id", instance_id); |
| versioned_put(txn0.get(), tablet_load_stats_version_key, tablet_load_stats_val); |
| |
| TabletStatsPB tablet_compact_stats; |
| tablet_compact_stats.CopyFrom(stats_pb); |
| auto tablet_compact_stats_val = tablet_compact_stats.SerializeAsString(); |
| std::string tablet_compact_stats_version_key = |
| versioned::tablet_compact_stats_key({instance_id, tablet_meta->tablet_id()}); |
| LOG_INFO("put versioned tablet compact stats key") |
| .tag("tablet_compact_stats_version_key", hex(tablet_compact_stats_version_key)) |
| .tag("tablet_id", tablet_meta->tablet_id()) |
| .tag("value_size", tablet_compact_stats_val.size()) |
| .tag("instance_id", instance_id); |
| versioned_put(txn0.get(), tablet_compact_stats_version_key, tablet_compact_stats_val); |
| } |
| update_tablet_stats(stat_info, tablet_stat, txn0, code, msg); |
| if (code != MetaServiceCode::OK) { |
| return; |
| } |
| |
| // 5. update restore job |
| std::string to_save_val; |
| restore_job_pb.set_mtime_s(::time(nullptr)); |
| restore_job_pb.set_state(RestoreJobCloudPB::COMMITTED); |
| restore_job_pb.set_committed_rowset_num(converted_rowset_num); |
| restore_job_pb.SerializeToString(&to_save_val); |
| |
| txn0->put(key, to_save_val); |
| LOG_INFO("commit restore job") |
| .tag("job_restore_tablet_key", hex(key)) |
| .tag("tablet_id", tablet_idx.tablet_id()) |
| .tag("state", restore_job_pb.state()) |
| .tag("mtime_s", restore_job_pb.mtime_s()) |
| .tag("committed_rowset_num", converted_rowset_num); |
| err = txn0->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::COMMIT>(err); |
| msg = fmt::format("failed to commit restore job: {}", err); |
| return; |
| } |
| } |
| |
| void MetaServiceImpl::finish_restore_job(::google::protobuf::RpcController* controller, |
| const RestoreJobRequest* request, |
| RestoreJobResponse* response, |
| ::google::protobuf::Closure* done) { |
| RPC_PREPROCESS(finish_restore_job); |
| if (request->action() != RestoreJobRequest::COMPLETE && |
| request->action() != RestoreJobRequest::ABORT) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "invalid action, expected COMPLETE or ABORT but got " + |
| RestoreJobRequest::Action_Name(request->action()); |
| return; |
| } |
| |
| if (!request->has_tablet_id()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "empty tablet_id"; |
| return; |
| } |
| |
| instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id()); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "empty instance_id"; |
| LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id(); |
| return; |
| } |
| |
| RPC_RATE_LIMIT(finish_restore_job) |
| |
| std::unique_ptr<Transaction> txn0; |
| TxnErrorCode err = txn_kv_->create_txn(&txn0); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| msg = "failed to init txn"; |
| return; |
| } |
| |
| TabletIndexPB tablet_idx; |
| get_tablet_idx(code, msg, txn0.get(), instance_id, request->tablet_id(), tablet_idx); |
| if (code != MetaServiceCode::OK) { |
| return; |
| } |
| |
| // 1. get restore job |
| auto key = job_restore_tablet_key({instance_id, request->tablet_id()}); |
| std::string val; |
| err = txn0->get(key, &val); |
| if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "restore job not exists or has been recycled"; |
| return; |
| } |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to check restore job existence, err={}", err); |
| LOG_WARNING(msg); |
| return; |
| } |
| |
| RestoreJobCloudPB restore_job_pb; |
| if (!restore_job_pb.ParseFromString(val)) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "malformed restore job"; |
| LOG_WARNING(msg); |
| return; |
| } |
| |
| if (restore_job_pb.state() == RestoreJobCloudPB::DROPPED || |
| restore_job_pb.state() == RestoreJobCloudPB::COMPLETED) { |
| LOG_INFO("restore job already finished") |
| .tag("job_restore_tablet_key", hex(key)) |
| .tag("tablet_id", tablet_idx.tablet_id()) |
| .tag("state", restore_job_pb.state()); |
| // already final state, return ok |
| return; |
| } else if (restore_job_pb.state() == RestoreJobCloudPB::RECYCLING) { |
| // RECYCLING, request may arrive when recycle start |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = fmt::format("restore tablet {} is recycling, state: {}", tablet_idx.tablet_id(), |
| RestoreJobCloudPB::State_Name(restore_job_pb.state())); |
| return; |
| } else { |
| // PREPARED, COMMITTED state |
| if (request->action() == RestoreJobRequest::COMPLETE && |
| restore_job_pb.state() != RestoreJobCloudPB::COMMITTED) { |
| // Only allow COMMITTED -> COMPLETED |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = fmt::format("restore tablet {} in invalid state to complete, state: {}", |
| tablet_idx.tablet_id(), |
| RestoreJobCloudPB::State_Name(restore_job_pb.state())); |
| return; |
| } |
| if (request->action() == RestoreJobRequest::ABORT && |
| (restore_job_pb.state() != RestoreJobCloudPB::PREPARED && |
| restore_job_pb.state() != RestoreJobCloudPB::COMMITTED)) { |
| // Only allow PREPARED/COMMITTED -> DROPPED |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = fmt::format("restore tablet {} in invalid state to abort, state: {}", |
| tablet_idx.tablet_id(), |
| RestoreJobCloudPB::State_Name(restore_job_pb.state())); |
| return; |
| } |
| } |
| |
| // 2. update restore job |
| std::string to_save_val; |
| bool is_completed = request->action() == RestoreJobRequest::COMPLETE; |
| restore_job_pb.set_state(is_completed ? RestoreJobCloudPB::COMPLETED |
| : RestoreJobCloudPB::DROPPED); |
| restore_job_pb.set_need_recycle_data(!is_completed); |
| restore_job_pb.SerializeToString(&to_save_val); |
| LOG_INFO("finish restore job") |
| .tag("job_restore_tablet_key", hex(key)) |
| .tag("tablet_id", tablet_idx.tablet_id()) |
| .tag("state", restore_job_pb.state()) |
| .tag("total_rowset_num", restore_job_pb.total_rowset_num()) |
| .tag("total_row_num", restore_job_pb.total_row_num()) |
| .tag("total_segment_num", restore_job_pb.total_segment_num()) |
| .tag("total_disk_size", restore_job_pb.total_disk_size()) |
| .tag("committed_rowset_num", restore_job_pb.committed_rowset_num()) |
| .tag("is_completed", is_completed); |
| txn0->put(key, to_save_val); |
| err = txn0->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::COMMIT>(err); |
| msg = fmt::format("failed to commit txn: {}", err); |
| return; |
| } |
| } |
| |
| /** |
| * Fills schema information into the rowset meta from the dictionary. |
| * Handles schemas with variant types by retrieving the complete schema from the dictionary. |
| * |
| * @param code Result code indicating the operation status. |
| * @param msg Error description for failed operations. |
| * @param instance_id Identifier for the specific instance. |
| * @param txn Pointer to the transaction object for transactional operations. |
| * @param existed_rowset_meta Rowset meta object to be updated with schema information. |
| */ |
| static void fill_schema_from_dict(MetaServiceCode& code, std::string& msg, |
| const std::string& instance_id, Transaction* txn, |
| doris::RowsetMetaCloudPB* existed_rowset_meta) { |
| google::protobuf::RepeatedPtrField<doris::RowsetMetaCloudPB> metas; |
| metas.Add()->CopyFrom(*existed_rowset_meta); |
| // Retrieve schema from the dictionary and update metas. |
| read_schema_dict(code, msg, instance_id, existed_rowset_meta->index_id(), txn, &metas, nullptr); |
| if (code != MetaServiceCode::OK) { |
| return; |
| } |
| // Update the original rowset meta with the complete schema from metas. |
| existed_rowset_meta->CopyFrom(metas.Get(0)); |
| } |
| |
| bool check_job_existed(Transaction* txn, MetaServiceCode& code, std::string& msg, |
| const std::string& instance_id, int64_t tablet_id, |
| const std::string& rowset_id, const std::string& job_id, |
| bool is_versioned_read, ResourceManager* resource_mgr) { |
| TabletIndexPB tablet_idx; |
| if (!is_versioned_read) { |
| get_tablet_idx(code, msg, txn, instance_id, tablet_id, tablet_idx); |
| if (code != MetaServiceCode::OK) { |
| return false; |
| } |
| } else { |
| CloneChainReader reader(instance_id, resource_mgr); |
| TxnErrorCode err = reader.get_tablet_index(txn, tablet_id, &tablet_idx); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to get tablet index, tablet_id={} err={}", tablet_id, err); |
| return false; |
| } |
| } |
| |
| std::string job_key = job_tablet_key({instance_id, tablet_idx.table_id(), tablet_idx.index_id(), |
| tablet_idx.partition_id(), tablet_id}); |
| std::string job_val; |
| auto err = txn->get(job_key, &job_val); |
| if (err != TxnErrorCode::TXN_OK) { |
| std::stringstream ss; |
| ss << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? "job not found," : "internal error,") |
| << " instance_id=" << instance_id << " tablet_id=" << tablet_id |
| << " rowset_id=" << rowset_id << " err=" << err; |
| msg = ss.str(); |
| code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::STALE_PREPARE_ROWSET |
| : cast_as<ErrCategory::READ>(err); |
| return false; |
| } |
| |
| TabletJobInfoPB job_pb; |
| job_pb.ParseFromString(job_val); |
| bool match = false; |
| if (!job_pb.compaction().empty()) { |
| for (auto c : job_pb.compaction()) { |
| if (c.id() == job_id) { |
| match = true; |
| } |
| } |
| } |
| |
| if (job_pb.has_schema_change()) { |
| if (job_pb.schema_change().id() == job_id) { |
| match = true; |
| } |
| } |
| |
| if (!match) { |
| std::stringstream ss; |
| ss << " stale perpare rowset request," |
| << " instance_id=" << instance_id << " tablet_id=" << tablet_id << " job id=" << job_id |
| << " rowset_id=" << rowset_id; |
| msg = ss.str(); |
| code = MetaServiceCode::STALE_PREPARE_ROWSET; |
| return false; |
| } |
| |
| return true; |
| } |
| |
| /** |
| * Check if the transaction status is as expected. |
| * If the transaction is not in the expected state, return false and set the error code and message. |
| * |
| * @param expect_status The expected transaction status. |
| * @param txn Pointer to the transaction object. |
| * @param instance_id The instance ID associated with the transaction. |
| * @param txn_id The transaction ID to check. |
| * @param code Reference to the error code to be set in case of failure. |
| * @param msg Reference to the error message to be set in case of failure. |
| * @return true if the transaction status matches the expected status, false otherwise. |
| */ |
| static bool check_transaction_status(TxnStatusPB expect_status, Transaction* txn, |
| const std::string& instance_id, int64_t txn_id, |
| MetaServiceCode& code, std::string& msg) { |
| // Get db id with txn id |
| std::string index_val; |
| const std::string index_key = txn_index_key({instance_id, txn_id}); |
| TxnErrorCode err = txn->get(index_key, &index_val); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to get db id, txn_id={} err={}", txn_id, err); |
| return false; |
| } |
| |
| TxnIndexPB index_pb; |
| if (!index_pb.ParseFromString(index_val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = fmt::format("failed to parse txn_index_pb, txn_id={}", txn_id); |
| return false; |
| } |
| |
| DCHECK(index_pb.has_tablet_index() == true); |
| DCHECK(index_pb.tablet_index().has_db_id() == true); |
| if (!index_pb.has_tablet_index() || !index_pb.tablet_index().has_db_id()) { |
| LOG(WARNING) << fmt::format( |
| "txn_index_pb is malformed, tablet_index has no db_id, txn_id={}", txn_id); |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = fmt::format("has no db_id in TxnIndexPB, txn_id={}", txn_id); |
| return false; |
| } |
| auto db_id = index_pb.tablet_index().db_id(); |
| txn_id = index_pb.has_parent_txn_id() ? index_pb.parent_txn_id() : txn_id; |
| |
| const std::string info_key = txn_info_key({instance_id, db_id, txn_id}); |
| std::string info_val; |
| err = txn->get(info_key, &info_val); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to get txn, txn_id={}, err={}", txn_id, err); |
| return false; |
| } |
| TxnInfoPB txn_info; |
| if (!txn_info.ParseFromString(info_val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = fmt::format("failed to parse txn_info, db_id={} txn_id={}", db_id, txn_id); |
| return false; |
| } |
| if (txn_info.status() != expect_status) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = fmt::format("txn is not in {} state, txn_id={}, txn_status={}", expect_status, txn_id, |
| txn_info.status()); |
| return false; |
| } |
| return true; |
| } |
| |
| /** |
| * 1. Check and confirm tmp rowset kv does not exist |
| * 2. Construct recycle rowset kv which contains object path |
| * 3. Put recycle rowset kv |
| */ |
| void MetaServiceImpl::prepare_rowset(::google::protobuf::RpcController* controller, |
| const CreateRowsetRequest* request, |
| CreateRowsetResponse* response, |
| ::google::protobuf::Closure* done) { |
| RPC_PREPROCESS(prepare_rowset, get, put); |
| if (!request->has_rowset_meta()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "no rowset meta"; |
| return; |
| } |
| instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id()); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "empty instance_id"; |
| LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id(); |
| return; |
| } |
| doris::RowsetMetaCloudPB rowset_meta(request->rowset_meta()); |
| if (!rowset_meta.has_tablet_schema() && !rowset_meta.has_schema_version()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "rowset_meta must have either schema or schema_version"; |
| return; |
| } |
| |
| RPC_RATE_LIMIT(prepare_rowset) |
| |
| int64_t tablet_id = rowset_meta.tablet_id(); |
| const auto& rowset_id = rowset_meta.rowset_id_v2(); |
| auto tmp_rs_key = meta_rowset_tmp_key({instance_id, rowset_meta.txn_id(), tablet_id}); |
| |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| msg = "failed to create txn"; |
| return; |
| } |
| |
| // Check if the compaction/sc tablet job has finished |
| bool is_versioned_read = is_version_read_enabled(instance_id); |
| if (config::enable_tablet_job_check && request->has_tablet_job_id() && |
| !request->tablet_job_id().empty()) { |
| if (!check_job_existed(txn.get(), code, msg, instance_id, tablet_id, rowset_id, |
| request->tablet_job_id(), is_versioned_read, resource_mgr_.get())) { |
| return; |
| } |
| } |
| |
| // Check if the prepare rowset request is invalid. |
| // If the transaction has been finished, it means this prepare rowset is a timeout retry request. |
| // In this case, do not write the recycle key again, otherwise it may cause data loss. |
| // If the rowset had load id, it means it is a load request, otherwise it is a |
| // compaction/sc request. |
| if (config::enable_load_txn_status_check && rowset_meta.has_load_id() && |
| !check_transaction_status(TxnStatusPB::TXN_STATUS_PREPARED, txn.get(), instance_id, |
| rowset_meta.txn_id(), code, msg)) { |
| LOG(WARNING) << "prepare rowset failed, txn_id=" << rowset_meta.txn_id() |
| << ", tablet_id=" << tablet_id << ", rowset_id=" << rowset_id |
| << ", rowset_state=" << rowset_meta.rowset_state() << ", msg=" << msg; |
| return; |
| } |
| |
| // Check if commit key already exists. |
| std::string val; |
| err = txn->get(tmp_rs_key, &val); |
| if (err == TxnErrorCode::TXN_OK) { |
| auto existed_rowset_meta = response->mutable_existed_rowset_meta(); |
| if (!existed_rowset_meta->ParseFromString(val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = fmt::format("malformed rowset meta value. key={}", hex(tmp_rs_key)); |
| return; |
| } |
| if (!existed_rowset_meta->has_index_id()) { |
| if (rowset_meta.has_index_id()) { |
| existed_rowset_meta->set_index_id(rowset_meta.index_id()); |
| } else if (!is_versioned_read) { |
| TabletIndexPB tablet_idx; |
| get_tablet_idx(code, msg, txn.get(), instance_id, tablet_id, tablet_idx); |
| if (code != MetaServiceCode::OK) return; |
| existed_rowset_meta->set_index_id(tablet_idx.index_id()); |
| rowset_meta.set_index_id(tablet_idx.index_id()); |
| } else { |
| CloneChainReader reader(instance_id, resource_mgr_.get()); |
| TabletIndexPB tablet_idx; |
| TxnErrorCode err = reader.get_tablet_index(txn.get(), tablet_id, &tablet_idx); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = err == TxnErrorCode::TXN_KEY_NOT_FOUND |
| ? MetaServiceCode::TABLET_NOT_FOUND |
| : cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to get tablet index, tablet_id={}, err={}", tablet_id, |
| err); |
| LOG(WARNING) << msg; |
| return; |
| } |
| existed_rowset_meta->set_index_id(tablet_idx.index_id()); |
| rowset_meta.set_index_id(tablet_idx.index_id()); |
| } |
| } |
| if (!existed_rowset_meta->has_tablet_schema()) { |
| set_schema_in_existed_rowset(code, msg, txn.get(), instance_id, rowset_meta, |
| *existed_rowset_meta, is_versioned_read, |
| resource_mgr_.get()); |
| if (code != MetaServiceCode::OK) return; |
| } else { |
| existed_rowset_meta->set_schema_version( |
| existed_rowset_meta->tablet_schema().schema_version()); |
| } |
| if (existed_rowset_meta->has_variant_type_in_schema()) { |
| fill_schema_from_dict(code, msg, instance_id, txn.get(), existed_rowset_meta); |
| if (code != MetaServiceCode::OK) return; |
| } |
| code = MetaServiceCode::ALREADY_EXISTED; |
| msg = "rowset already exists"; |
| return; |
| } |
| if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| code = cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to check whether rowset exists, err={}", err); |
| return; |
| } |
| |
| auto prepare_rs_key = recycle_rowset_key({instance_id, tablet_id, rowset_id}); |
| RecycleRowsetPB prepare_rowset; |
| using namespace std::chrono; |
| int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count(); |
| prepare_rowset.set_creation_time(now); |
| prepare_rowset.set_expiration(request->rowset_meta().txn_expiration()); |
| // Schema is useless for PREPARE type recycle rowset, set it to null to reduce storage space |
| rowset_meta.set_allocated_tablet_schema(nullptr); |
| prepare_rowset.mutable_rowset_meta()->CopyFrom(rowset_meta); |
| prepare_rowset.set_type(RecycleRowsetPB::PREPARE); |
| prepare_rowset.SerializeToString(&val); |
| DCHECK_GT(prepare_rowset.expiration(), 0); |
| txn->put(prepare_rs_key, val); |
| std::size_t segment_key_bounds_bytes = get_segments_key_bounds_bytes(rowset_meta); |
| LOG(INFO) << "put prepare_rs_key " << hex(prepare_rs_key) << " value_size " << val.size() |
| << " txn_id " << request->txn_id() << " segment_key_bounds_bytes " |
| << segment_key_bounds_bytes; |
| err = txn->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| if (err == TxnErrorCode::TXN_VALUE_TOO_LARGE) { |
| LOG(WARNING) << "failed to prepare rowset, err=value too large" |
| << ", txn_id=" << request->txn_id() << ", tablet_id=" << tablet_id |
| << ", rowset_id=" << rowset_id |
| << ", rowset_meta_bytes=" << rowset_meta.ByteSizeLong() |
| << ", segment_key_bounds_bytes=" << segment_key_bounds_bytes |
| << ", rowset_meta=" << rowset_meta.ShortDebugString(); |
| } |
| code = cast_as<ErrCategory::COMMIT>(err); |
| msg = fmt::format("failed to save recycle rowset, err={}", err); |
| return; |
| } |
| } |
| |
| /** |
| * 1. Check and confirm tmp rowset kv does not exist |
| * a. if exist |
| * 1. if tmp rowset is same with self, it may be a redundant |
| * retry request, return ok |
| * 2. else, abort commit_rowset |
| * b. else, goto 2 |
| * 2. Remove recycle rowset kv and put tmp rowset kv |
| */ |
| void MetaServiceImpl::commit_rowset(::google::protobuf::RpcController* controller, |
| const CreateRowsetRequest* request, |
| CreateRowsetResponse* response, |
| ::google::protobuf::Closure* done) { |
| RPC_PREPROCESS(commit_rowset, get, put, del); |
| if (!request->has_rowset_meta()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "no rowset meta"; |
| return; |
| } |
| instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id()); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "empty instance_id"; |
| LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id(); |
| return; |
| } |
| doris::RowsetMetaCloudPB rowset_meta(request->rowset_meta()); |
| if (!rowset_meta.has_tablet_schema() && !rowset_meta.has_schema_version()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "rowset_meta must have either schema or schema_version"; |
| return; |
| } |
| RPC_RATE_LIMIT(commit_rowset) |
| |
| int64_t tablet_id = rowset_meta.tablet_id(); |
| const auto& rowset_id = rowset_meta.rowset_id_v2(); |
| |
| auto tmp_rs_key = meta_rowset_tmp_key({instance_id, rowset_meta.txn_id(), tablet_id}); |
| |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| msg = "failed to create txn"; |
| return; |
| } |
| |
| // Check if the compaction/sc tablet job has finished |
| bool is_versioned_read = is_version_read_enabled(instance_id); |
| if (config::enable_tablet_job_check && request->has_tablet_job_id() && |
| !request->tablet_job_id().empty()) { |
| if (!check_job_existed(txn.get(), code, msg, instance_id, tablet_id, rowset_id, |
| request->tablet_job_id(), is_versioned_read, resource_mgr_.get())) { |
| return; |
| } |
| } |
| |
| // Check if the commit rowset request is invalid. |
| // If the transaction has been finished, it means this commit rowset is a timeout retry request. |
| // In this case, do not write the recycle key again, otherwise it may cause data loss. |
| // If the rowset has load id, it means it is a load request, otherwise it is a |
| // compaction/sc request. |
| if (config::enable_load_txn_status_check && rowset_meta.has_load_id() && |
| !check_transaction_status(TxnStatusPB::TXN_STATUS_PREPARED, txn.get(), instance_id, |
| rowset_meta.txn_id(), code, msg)) { |
| LOG(WARNING) << "commit rowset failed, txn_id=" << rowset_meta.txn_id() |
| << ", tablet_id=" << tablet_id << ", rowset_id=" << rowset_id |
| << ", rowset_state=" << rowset_meta.rowset_state() << ", msg=" << msg; |
| return; |
| } |
| |
| // Check if commit key already exists. |
| std::string existed_commit_val; |
| err = txn->get(tmp_rs_key, &existed_commit_val); |
| if (err == TxnErrorCode::TXN_OK) { |
| auto existed_rowset_meta = response->mutable_existed_rowset_meta(); |
| if (!existed_rowset_meta->ParseFromString(existed_commit_val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = fmt::format("malformed rowset meta value. key={}", hex(tmp_rs_key)); |
| return; |
| } |
| if (existed_rowset_meta->rowset_id_v2() == rowset_meta.rowset_id_v2()) { |
| // Same request, return OK |
| response->set_allocated_existed_rowset_meta(nullptr); |
| return; |
| } |
| if (!existed_rowset_meta->has_index_id()) { |
| if (rowset_meta.has_index_id()) { |
| existed_rowset_meta->set_index_id(rowset_meta.index_id()); |
| } else if (!is_versioned_read) { |
| TabletIndexPB tablet_idx; |
| get_tablet_idx(code, msg, txn.get(), instance_id, rowset_meta.tablet_id(), |
| tablet_idx); |
| if (code != MetaServiceCode::OK) return; |
| existed_rowset_meta->set_index_id(tablet_idx.index_id()); |
| } else { |
| CloneChainReader reader(instance_id, resource_mgr_.get()); |
| TabletIndexPB tablet_idx; |
| TxnErrorCode err = reader.get_tablet_index(txn.get(), tablet_id, &tablet_idx); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = err == TxnErrorCode::TXN_KEY_NOT_FOUND |
| ? MetaServiceCode::TABLET_NOT_FOUND |
| : cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to get tablet index, tablet_id={}, err={}", tablet_id, |
| err); |
| LOG(WARNING) << msg; |
| return; |
| } |
| existed_rowset_meta->set_index_id(tablet_idx.index_id()); |
| } |
| } |
| if (!existed_rowset_meta->has_tablet_schema()) { |
| set_schema_in_existed_rowset(code, msg, txn.get(), instance_id, rowset_meta, |
| *existed_rowset_meta, is_versioned_read, |
| resource_mgr_.get()); |
| if (code != MetaServiceCode::OK) return; |
| } else { |
| existed_rowset_meta->set_schema_version( |
| existed_rowset_meta->tablet_schema().schema_version()); |
| } |
| if (existed_rowset_meta->has_variant_type_in_schema()) { |
| fill_schema_from_dict(code, msg, instance_id, txn.get(), existed_rowset_meta); |
| if (code != MetaServiceCode::OK) return; |
| } |
| code = MetaServiceCode::ALREADY_EXISTED; |
| msg = "rowset already exists"; |
| return; |
| } |
| if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| code = cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to check whether rowset exists, err={}", err); |
| return; |
| } |
| // write schema kv if rowset_meta has schema |
| if (config::write_schema_kv && rowset_meta.has_tablet_schema()) { |
| if (!rowset_meta.has_index_id() && !is_versioned_read) { |
| TabletIndexPB tablet_idx; |
| get_tablet_idx(code, msg, txn.get(), instance_id, rowset_meta.tablet_id(), tablet_idx); |
| if (code != MetaServiceCode::OK) return; |
| rowset_meta.set_index_id(tablet_idx.index_id()); |
| } else if (!rowset_meta.has_index_id()) { |
| CloneChainReader reader(instance_id, resource_mgr_.get()); |
| TabletIndexPB tablet_idx; |
| TxnErrorCode err = |
| reader.get_tablet_index(txn.get(), rowset_meta.tablet_id(), &tablet_idx); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TABLET_NOT_FOUND |
| : cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to get tablet index, tablet_id={}, err={}", |
| rowset_meta.tablet_id(), err); |
| LOG(WARNING) << msg; |
| return; |
| } |
| rowset_meta.set_index_id(tablet_idx.index_id()); |
| } |
| DCHECK(rowset_meta.tablet_schema().has_schema_version()); |
| DCHECK_GE(rowset_meta.tablet_schema().schema_version(), 0); |
| rowset_meta.set_schema_version(rowset_meta.tablet_schema().schema_version()); |
| if (rowset_meta.has_variant_type_in_schema()) { |
| write_schema_dict(code, msg, instance_id, txn.get(), &rowset_meta); |
| if (code != MetaServiceCode::OK) return; |
| } |
| bool is_versioned_write = is_version_write_enabled(instance_id); |
| std::string schema_key = meta_schema_key( |
| {instance_id, rowset_meta.index_id(), rowset_meta.schema_version()}); |
| put_schema_kv(code, msg, txn.get(), schema_key, rowset_meta.tablet_schema()); |
| if (code != MetaServiceCode::OK) return; |
| if (is_versioned_write) { |
| std::string versioned_schema_key = versioned::meta_schema_key( |
| {instance_id, rowset_meta.index_id(), rowset_meta.schema_version()}); |
| put_versioned_schema_kv(code, msg, txn.get(), versioned_schema_key, |
| rowset_meta.tablet_schema()); |
| if (code != MetaServiceCode::OK) return; |
| } |
| rowset_meta.set_allocated_tablet_schema(nullptr); |
| } |
| |
| auto recycle_rs_key = recycle_rowset_key({instance_id, tablet_id, rowset_id}); |
| txn->remove(recycle_rs_key); |
| |
| DCHECK_GT(rowset_meta.txn_expiration(), 0); |
| auto tmp_rs_val = rowset_meta.SerializeAsString(); |
| txn->put(tmp_rs_key, tmp_rs_val); |
| |
| if (is_version_write_enabled(instance_id)) { |
| std::string rowset_ref_count_key = |
| versioned::data_rowset_ref_count_key({instance_id, tablet_id, rowset_id}); |
| LOG(INFO) << "add rowset ref count key, instance_id=" << instance_id |
| << "key=" << hex(rowset_ref_count_key); |
| txn->atomic_add(rowset_ref_count_key, 1); |
| |
| // Recycler uses end_version to check if the meta_rowset_compact_key or |
| // meta_rowset_load_key exists. |
| // The end_version is not set if rowset is committed by load, later commit_txn will write |
| // this key again to save end_version. |
| std::string meta_rowset_key = |
| versioned::meta_rowset_key({instance_id, tablet_id, rowset_id}); |
| if (config::enable_recycle_rowset_strip_key_bounds) { |
| doris::RowsetMetaCloudPB rowset_meta_copy = rowset_meta; |
| // Strip key bounds to shrink operation log for ts compaction recycle entries |
| rowset_meta_copy.clear_segments_key_bounds(); |
| rowset_meta_copy.clear_segments_key_bounds_truncated(); |
| blob_put(txn.get(), meta_rowset_key, rowset_meta_copy.SerializeAsString(), 0); |
| } else { |
| blob_put(txn.get(), meta_rowset_key, tmp_rs_val, 0); |
| } |
| LOG(INFO) << "put versioned meta_rowset_key=" << hex(meta_rowset_key); |
| } |
| |
| std::size_t segment_key_bounds_bytes = get_segments_key_bounds_bytes(rowset_meta); |
| LOG(INFO) << "put tmp_rs_key " << hex(tmp_rs_key) << " delete recycle_rs_key " |
| << hex(recycle_rs_key) << " value_size " << tmp_rs_val.size() << " txn_id " |
| << request->txn_id() << " segment_key_bounds_bytes " << segment_key_bounds_bytes; |
| err = txn->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::COMMIT>(err); |
| ss << "failed to save rowset meta, err=" << err; |
| if (err == TxnErrorCode::TXN_VALUE_TOO_LARGE) { |
| LOG(WARNING) << "failed to commit rowset, err=value too large" |
| << ", txn_id=" << request->txn_id() << ", tablet_id=" << tablet_id |
| << ", rowset_id=" << rowset_id |
| << ", rowset_meta_bytes=" << rowset_meta.ByteSizeLong() |
| << ", segment_key_bounds_bytes=" << segment_key_bounds_bytes |
| << ", num_segments=" << rowset_meta.num_segments() |
| << ", rowset_meta=" << rowset_meta.ShortDebugString(); |
| ss << ". The key column data is too large, or too many partitions are being loaded " |
| "simultaneously. Please reduce the size of the key column data or lower the " |
| "number of partitions involved in a single load or update."; |
| } |
| msg = ss.str(); |
| return; |
| } |
| } |
| |
| void MetaServiceImpl::update_tmp_rowset(::google::protobuf::RpcController* controller, |
| const CreateRowsetRequest* request, |
| CreateRowsetResponse* response, |
| ::google::protobuf::Closure* done) { |
| RPC_PREPROCESS(update_tmp_rowset, get, put); |
| if (!request->has_rowset_meta()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "no rowset meta"; |
| return; |
| } |
| instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id()); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "empty instance_id"; |
| LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id(); |
| return; |
| } |
| doris::RowsetMetaCloudPB rowset_meta(request->rowset_meta()); |
| if (!rowset_meta.has_tablet_schema() && !rowset_meta.has_schema_version()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "rowset_meta must have either schema or schema_version"; |
| return; |
| } |
| RPC_RATE_LIMIT(update_tmp_rowset) |
| int64_t tablet_id = rowset_meta.tablet_id(); |
| |
| std::string update_key; |
| std::string update_val; |
| |
| int64_t txn_id = rowset_meta.txn_id(); |
| MetaRowsetTmpKeyInfo key_info {instance_id, txn_id, tablet_id}; |
| meta_rowset_tmp_key(key_info, &update_key); |
| |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| msg = "failed to create txn"; |
| return; |
| } |
| |
| // Check if commit key already exists. |
| std::string existed_commit_val; |
| err = txn->get(update_key, &existed_commit_val); |
| if (err == TxnErrorCode::TXN_OK) { |
| auto existed_rowset_meta = response->mutable_existed_rowset_meta(); |
| if (!existed_rowset_meta->ParseFromString(existed_commit_val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = fmt::format("malformed rowset meta value. key={}", hex(update_key)); |
| return; |
| } |
| } else if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| code = MetaServiceCode::ROWSET_META_NOT_FOUND; |
| LOG_WARNING( |
| "fail to find the rowset meta with key={}, instance_id={}, txn_id={}, " |
| "tablet_id={}, rowset_id={}", |
| hex(update_key), instance_id, rowset_meta.txn_id(), tablet_id, |
| rowset_meta.rowset_id_v2()); |
| msg = "can't find the rowset"; |
| return; |
| } else { |
| code = cast_as<ErrCategory::READ>(err); |
| LOG_WARNING( |
| "internal error, fail to find the rowset meta with key={}, instance_id={}, " |
| "txn_id={}, tablet_id={}, rowset_id={}", |
| hex(update_key), instance_id, rowset_meta.txn_id(), tablet_id, |
| rowset_meta.rowset_id_v2()); |
| msg = fmt::format("failed to check whether rowset exists, err={}", err); |
| return; |
| } |
| if (rowset_meta.has_variant_type_in_schema()) { |
| write_schema_dict(code, msg, instance_id, txn.get(), &rowset_meta); |
| if (code != MetaServiceCode::OK) return; |
| } |
| DCHECK_GT(rowset_meta.txn_expiration(), 0); |
| if (!rowset_meta.SerializeToString(&update_val)) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| msg = "failed to serialize rowset meta"; |
| return; |
| } |
| |
| txn->put(update_key, update_val); |
| std::size_t segment_key_bounds_bytes = get_segments_key_bounds_bytes(rowset_meta); |
| LOG(INFO) << "xxx put " |
| << "update_rowset_key " << hex(update_key) << " value_size " << update_val.size() |
| << " segment_key_bounds_bytes " << segment_key_bounds_bytes; |
| err = txn->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::COMMIT>(err); |
| ss << "failed to update rowset meta, err=" << err; |
| if (err == TxnErrorCode::TXN_VALUE_TOO_LARGE) { |
| const auto& rowset_id = rowset_meta.rowset_id_v2(); |
| LOG(WARNING) << "failed to update tmp rowset, err=value too large" |
| << ", txn_id=" << request->txn_id() << ", tablet_id=" << tablet_id |
| << ", rowset_id=" << rowset_id |
| << ", rowset_meta_bytes=" << rowset_meta.ByteSizeLong() |
| << ", segment_key_bounds_bytes=" << segment_key_bounds_bytes |
| << ", rowset_meta=" << rowset_meta.ShortDebugString(); |
| ss << ". The key column data is too large, or too many partitions are being loaded " |
| "simultaneously. Please reduce the size of the key column data or lower the " |
| "number of partitions involved in a single load or update."; |
| } |
| msg = ss.str(); |
| return; |
| } |
| } |
| |
| void internal_get_rowset(Transaction* txn, int64_t start, int64_t end, |
| const std::string& instance_id, int64_t tablet_id, MetaServiceCode& code, |
| std::string& msg, GetRowsetResponse* response) { |
| LOG(INFO) << "get_rowset start=" << start << ", end=" |
| << " tablet_id=" << tablet_id; |
| MetaRowsetKeyInfo key_info0 {instance_id, tablet_id, start}; |
| MetaRowsetKeyInfo key_info1 {instance_id, tablet_id, end + 1}; |
| std::string key0; |
| std::string key1; |
| meta_rowset_key(key_info0, &key0); |
| meta_rowset_key(key_info1, &key1); |
| std::unique_ptr<RangeGetIterator> it; |
| |
| int num_rowsets = 0; |
| DORIS_CLOUD_DEFER_COPY(key0, key1) { |
| LOG(INFO) << "get rowset meta, num_rowsets=" << num_rowsets << " range=[" << hex(key0) |
| << "," << hex(key1) << "]" |
| << " tablet_id=" << tablet_id; |
| }; |
| |
| std::stringstream ss; |
| do { |
| TxnErrorCode err = txn->get(key0, key1, &it); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| ss << "internal error, failed to get rowset, err=" << err << " tablet_id=" << tablet_id; |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| |
| while (it->has_next()) { |
| auto [k, v] = it->next(); |
| auto* rs = response->add_rowset_meta(); |
| auto byte_size = rs->ByteSizeLong(); |
| TEST_SYNC_POINT_CALLBACK("get_rowset:meta_exceed_limit", &byte_size); |
| if (byte_size + v.size() > std::numeric_limits<int32_t>::max()) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = fmt::format( |
| "rowset meta exceeded 2G, unable to serialize, key={}. byte_size={} " |
| "tablet_id={}", |
| hex(k), byte_size, tablet_id); |
| LOG(WARNING) << msg; |
| return; |
| } |
| if (!rs->ParseFromArray(v.data(), v.size())) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = "malformed rowset meta, unable to serialize, tablet_id=" + |
| std::to_string(tablet_id); |
| LOG(WARNING) << msg << " key=" << hex(k); |
| return; |
| } |
| ++num_rowsets; |
| if (!it->has_next()) { |
| key0 = k; |
| } |
| } |
| key0.push_back('\x00'); // Update to next smallest key for iteration |
| } while (it->more()); |
| } |
| |
| std::vector<std::pair<int64_t, int64_t>> calc_sync_versions(int64_t req_bc_cnt, int64_t bc_cnt, |
| int64_t req_cc_cnt, int64_t cc_cnt, |
| int64_t req_cp, int64_t cp, |
| int64_t req_fc_cnt, int64_t fc_cnt, |
| int64_t req_start, int64_t req_end) { |
| using Version = std::pair<int64_t, int64_t>; |
| // combine `v1` `v2` to `v1`, return true if success |
| static auto combine_if_overlapping = [](Version& v1, Version& v2) -> bool { |
| if (v1.second + 1 < v2.first || v2.second + 1 < v1.first) return false; |
| v1.first = std::min(v1.first, v2.first); |
| v1.second = std::max(v1.second, v2.second); |
| return true; |
| }; |
| // [xxx]: compacted versions |
| // ^~~~~: cumulative point |
| // ^___^: related versions |
| std::vector<Version> versions; |
| if (req_bc_cnt < bc_cnt) { |
| // * for any BC happended |
| // BE [=][=][=][=][=====][=][=] |
| // ^~~~~ req_cp |
| // MS [xxxxxxxxxx][xxxxxxxxxxxxxx][=======][=][=] |
| // ^~~~~~~ ms_cp |
| // ^_________________________^ versions_return: [0, ms_cp - 1] |
| versions.emplace_back(0, cp - 1); |
| } |
| |
| if (req_cc_cnt < cc_cnt) { |
| Version cc_version; |
| if (req_cp < cp && req_cc_cnt + 1 == cc_cnt && req_fc_cnt == fc_cnt) { |
| // * only one CC happened and CP changed, and no full compaction happened |
| // BE [=][=][=][=][=====][=][=] |
| // ^~~~~ req_cp |
| // MS [=][=][=][=][xxxxxxxxxxxxxx][=======][=][=] |
| // ^~~~~~~ ms_cp |
| // ^____________^ related_versions: [req_cp, ms_cp - 1] |
| // |
| cc_version = {req_cp, cp - 1}; |
| } else { |
| // * more than one CC happened and CP changed |
| // BE [=][=][=][=][=====][=][=] |
| // ^~~~~ req_cp |
| // MS [=][=][=][=][xxxxxxxxxxxxxx][xxxxxxx][=][=] |
| // ^~~~~~~ ms_cp |
| // ^_____________________^ related_versions: [req_cp, max] |
| // |
| // * more than one CC happened and CP remain unchanged |
| // BE [=][=][=][=][=====][=][=] |
| // ^~~~~ req_cp |
| // MS [=][=][=][=][xxxxxxxxxxxxxx][xxxxxxx][=][=] |
| // ^~~~~~~~~~~~~~ ms_cp |
| // ^_____________________^ related_versions: [req_cp, max] |
| // there may be holes if we don't return all version |
| // after ms_cp, however it can be optimized. |
| // * one CC happened and CP changed, and full compaction happened |
| // BE [=][=][=][=][=][=][=][=][=][=] |
| // ^~~~~ req_cp |
| // MS [xxxxxxxxxxxxxx][xxxxxxxxxxxxxx][=======][=][=] |
| // ^~~~~~~ ms_cp |
| // ^___________________________^ related_versions: [req_cp, max] |
| // |
| cc_version = {req_cp, std::numeric_limits<int64_t>::max() - 1}; |
| } |
| if (versions.empty() || !combine_if_overlapping(versions.front(), cc_version)) { |
| versions.push_back(cc_version); |
| } |
| } |
| |
| Version query_version {req_start, req_end}; |
| bool combined = false; |
| for (auto& v : versions) { |
| if ((combined = combine_if_overlapping(v, query_version))) break; |
| } |
| if (!combined) { |
| versions.push_back(query_version); |
| } |
| std::sort(versions.begin(), versions.end(), |
| [](const Version& v1, const Version& v2) { return v1.first < v2.first; }); |
| return versions; |
| } |
| |
| void MetaServiceImpl::get_partition_pending_txn_id(std::string_view instance_id, int64_t db_id, |
| int64_t table_id, int64_t partition_id, |
| int64_t tablet_id, std::stringstream& ss, |
| MetaServiceCode& code, std::string& msg, |
| int64_t& first_txn_id, |
| int64_t& partition_version, Transaction* txn) { |
| std::string ver_val; |
| std::string ver_key = partition_version_key({instance_id, db_id, table_id, partition_id}); |
| TxnErrorCode err = txn->get(ver_key, &ver_val); |
| if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| // No pending txn, return empty |
| first_txn_id = -1; |
| return; |
| } else if (TxnErrorCode::TXN_OK != err) { |
| code = cast_as<ErrCategory::READ>(err); |
| ss << "failed to get partiton version, tablet_id=" << tablet_id << " key=" << hex(ver_key) |
| << " err=" << err; |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| |
| VersionPB version_pb; |
| if (!version_pb.ParseFromString(ver_val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| ss << "failed to parse version pb db_id=" << db_id << " table_id=" << table_id |
| << " partition_id" << partition_id << " key=" << hex(ver_key); |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| |
| if (version_pb.pending_txn_ids_size() > 0) { |
| DCHECK(version_pb.pending_txn_ids_size() == 1); |
| first_txn_id = version_pb.pending_txn_ids(0); |
| } else { |
| first_txn_id = -1; |
| } |
| partition_version = version_pb.version(); |
| } |
| |
| void MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller, |
| const GetRowsetRequest* request, GetRowsetResponse* response, |
| ::google::protobuf::Closure* done) { |
| RPC_PREPROCESS(get_rowset, get); |
| instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id()); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "empty instance_id"; |
| LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id(); |
| return; |
| } |
| RPC_RATE_LIMIT(get_rowset) |
| int64_t tablet_id = request->idx().has_tablet_id() ? request->idx().tablet_id() : -1; |
| if (tablet_id <= 0) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "no valid tablet_id given"; |
| return; |
| } |
| |
| if (!request->has_base_compaction_cnt() || !request->has_cumulative_compaction_cnt() || |
| !request->has_cumulative_point()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "no valid compaction_cnt or cumulative_point given, tablet_id=" + |
| std::to_string(tablet_id); |
| LOG(WARNING) << msg; |
| return; |
| } |
| int64_t req_bc_cnt = request->base_compaction_cnt(); |
| int64_t req_cc_cnt = request->cumulative_compaction_cnt(); |
| int64_t req_fc_cnt = request->has_full_compaction_cnt() ? request->full_compaction_cnt() : 0; |
| int64_t req_cp = request->cumulative_point(); |
| |
| bool is_versioned_read = is_version_read_enabled(instance_id); |
| CloneChainReader reader(instance_id, txn_kv_.get(), resource_mgr_.get()); |
| do { |
| TEST_SYNC_POINT_CALLBACK("get_rowset:begin", &tablet_id); |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| msg = "failed to create txn"; |
| LOG(WARNING) << msg; |
| return; |
| } |
| DORIS_CLOUD_DEFER { |
| if (txn == nullptr) return; |
| stats.get_bytes += txn->get_bytes(); |
| stats.get_counter += txn->num_get_keys(); |
| }; |
| TabletIndexPB idx; |
| // Get tablet id index from kv |
| if (!is_versioned_read) { |
| get_tablet_idx(code, msg, txn.get(), instance_id, tablet_id, idx); |
| if (code != MetaServiceCode::OK) { |
| return; |
| } |
| } else { |
| err = reader.get_tablet_index(txn.get(), tablet_id, &idx); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to get versioned tablet index, err={}, tablet_id={}", err, |
| tablet_id); |
| LOG(WARNING) << msg; |
| return; |
| } |
| } |
| DCHECK(request->has_idx()); |
| |
| if (idx.has_db_id() && config::advance_txn_lazy_commit_during_reads) { |
| // there is maybe a lazy commit txn when call get_rowset |
| // we need advance lazy commit txn here |
| int64_t first_txn_id = -1; |
| int64_t partition_version = -1; |
| if (!is_versioned_read) { |
| get_partition_pending_txn_id(instance_id, idx.db_id(), idx.table_id(), |
| idx.partition_id(), tablet_id, ss, code, msg, |
| first_txn_id, partition_version, txn.get()); |
| if (code != MetaServiceCode::OK) { |
| return; |
| } |
| response->set_partition_max_version(partition_version); |
| } else { |
| err = reader.get_partition_pending_txn_id(txn.get(), idx.partition_id(), |
| &first_txn_id, &partition_version); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| msg = fmt::format( |
| "failed to get versioned partition pending txn id, err={}, " |
| "partition_id={}, tablet_id={}", |
| err, idx.partition_id(), tablet_id); |
| LOG(WARNING) << msg; |
| return; |
| } |
| response->set_partition_max_version(partition_version); |
| } |
| if (first_txn_id >= 0) { |
| stats.get_bytes += txn->get_bytes(); |
| stats.get_counter += txn->num_get_keys(); |
| txn.reset(); |
| TEST_SYNC_POINT_CALLBACK("get_rowset::advance_last_pending_txn_id", &first_txn_id); |
| std::shared_ptr<TxnLazyCommitTask> task = |
| txn_lazy_committer_->submit(instance_id, first_txn_id); |
| |
| if (config::wait_txn_lazy_commit_during_reads) { |
| std::tie(code, msg) = task->wait(); |
| if (code != MetaServiceCode::OK) { |
| LOG(WARNING) << "advance_last_txn failed last_txn=" << first_txn_id |
| << " code=" << code << " msg=" << msg; |
| return; |
| } |
| continue; |
| } |
| } |
| } |
| |
| // TODO(plat1ko): Judge if tablet has been dropped (in dropped index/partition) |
| |
| TabletStatsPB tablet_stat; |
| if (!is_versioned_read) { |
| internal_get_tablet_stats(code, msg, txn.get(), instance_id, idx, tablet_stat); |
| if (code != MetaServiceCode::OK) return; |
| } else { |
| err = reader.get_tablet_compact_stats(txn.get(), tablet_id, &tablet_stat, nullptr); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to get tablet compact stats, err={}, tablet_id={}", err, |
| tablet_id); |
| LOG(WARNING) << msg; |
| return; |
| } |
| } |
| VLOG_DEBUG << "tablet_id=" << tablet_id << " stats=" << proto_to_json(tablet_stat); |
| |
| int64_t bc_cnt = tablet_stat.base_compaction_cnt(); |
| int64_t cc_cnt = tablet_stat.cumulative_compaction_cnt(); |
| int64_t fc_cnt = |
| tablet_stat.has_full_compaction_cnt() ? tablet_stat.full_compaction_cnt() : 0; |
| int64_t cp = tablet_stat.cumulative_point(); |
| |
| response->mutable_stats()->CopyFrom(tablet_stat); |
| |
| int64_t req_start = request->start_version(); |
| int64_t req_end = request->end_version(); |
| req_end = req_end < 0 ? std::numeric_limits<int64_t>::max() - 1 : req_end; |
| |
| //========================================================================== |
| // Find version ranges to be synchronized due to compaction |
| //========================================================================== |
| if (req_bc_cnt > bc_cnt || req_cc_cnt > cc_cnt || req_cp > cp || req_fc_cnt > fc_cnt) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| ss << "no valid compaction_cnt or cumulative_point given. req_bc_cnt=" << req_bc_cnt |
| << ", bc_cnt=" << bc_cnt << ", req_cc_cnt=" << req_cc_cnt << ", cc_cnt=" << cc_cnt |
| << " req_fc_cnt=" << req_fc_cnt << ", fc_cnt=" << fc_cnt << ", req_cp=" << req_cp |
| << ", cp=" << cp << " tablet_id=" << tablet_id; |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, |
| req_fc_cnt, fc_cnt, req_start, req_end); |
| if (!is_versioned_read) { |
| for (auto [start, end] : versions) { |
| internal_get_rowset(txn.get(), start, end, instance_id, tablet_id, code, msg, |
| response); |
| if (code != MetaServiceCode::OK) { |
| return; |
| } |
| } |
| } else { |
| for (auto [start, end] : versions) { |
| std::vector<RowsetMetaCloudPB> rowset_metas; |
| TxnErrorCode err = |
| reader.get_rowset_metas(txn.get(), tablet_id, start, end, &rowset_metas); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| msg = fmt::format( |
| "failed to get versioned rowset, err={}, tablet_id={}, version=[{}-{}]", |
| err, tablet_id, start, end); |
| LOG(WARNING) << msg; |
| return; |
| } |
| |
| std::move(rowset_metas.begin(), rowset_metas.end(), |
| google::protobuf::RepeatedPtrFieldBackInserter( |
| response->mutable_rowset_meta())); |
| } |
| } |
| |
| // get referenced schema |
| std::unordered_map<int32_t, doris::TabletSchemaCloudPB*> version_to_schema; |
| for (auto& rowset_meta : *response->mutable_rowset_meta()) { |
| if (rowset_meta.has_tablet_schema()) { |
| version_to_schema.emplace(rowset_meta.tablet_schema().schema_version(), |
| rowset_meta.mutable_tablet_schema()); |
| rowset_meta.set_schema_version(rowset_meta.tablet_schema().schema_version()); |
| } |
| rowset_meta.set_index_id(idx.index_id()); |
| } |
| bool need_read_schema_dict = false; |
| auto arena = response->GetArena(); |
| for (auto& rowset_meta : *response->mutable_rowset_meta()) { |
| if (rowset_meta.has_schema_dict_key_list()) { |
| need_read_schema_dict = true; |
| } |
| if (rowset_meta.has_tablet_schema()) continue; |
| if (!rowset_meta.has_schema_version()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = fmt::format( |
| "rowset_meta must have either schema or schema_version, " |
| "rowset_version=[{}-{}]", |
| rowset_meta.start_version(), rowset_meta.end_version()); |
| return; |
| } |
| if (auto it = version_to_schema.find(rowset_meta.schema_version()); |
| it != version_to_schema.end()) { |
| if (arena != nullptr) { |
| rowset_meta.set_allocated_tablet_schema(it->second); |
| } else { |
| rowset_meta.mutable_tablet_schema()->CopyFrom(*it->second); |
| } |
| } else { |
| if (!try_fetch_and_parse_schema(txn.get(), rowset_meta.mutable_tablet_schema(), |
| instance_id, idx.index_id(), |
| rowset_meta.schema_version(), code, msg, |
| is_versioned_read, resource_mgr_.get())) { |
| return; |
| } |
| version_to_schema.emplace(rowset_meta.schema_version(), |
| rowset_meta.mutable_tablet_schema()); |
| } |
| } |
| |
| if (need_read_schema_dict && request->schema_op() != GetRowsetRequest::NO_DICT) { |
| read_schema_dict(code, msg, instance_id, idx.index_id(), txn.get(), |
| response->mutable_rowset_meta(), response->mutable_schema_dict(), |
| request->schema_op()); |
| if (code != MetaServiceCode::OK) return; |
| } |
| TEST_SYNC_POINT_CALLBACK("get_rowset::finish", &response); |
| break; |
| } while (true); |
| } |
| |
| void MetaServiceImpl::get_tablet_stats(::google::protobuf::RpcController* controller, |
| const GetTabletStatsRequest* request, |
| GetTabletStatsResponse* response, |
| ::google::protobuf::Closure* done) { |
| RPC_PREPROCESS(get_tablet_stats, get); |
| instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id()); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "empty instance_id"; |
| LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id(); |
| return; |
| } |
| RPC_RATE_LIMIT(get_tablet_stats) |
| |
| bool is_versioned_read = is_version_read_enabled(instance_id); |
| CloneChainReader reader(instance_id, txn_kv_.get(), resource_mgr_.get()); |
| for (auto& i : request->tablet_idx()) { |
| TabletIndexPB idx(i); |
| // FIXME(plat1ko): Get all tablet stats in one txn |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| msg = fmt::format("failed to create txn, tablet_id={}", idx.tablet_id()); |
| return; |
| } |
| DORIS_CLOUD_DEFER { |
| stats.get_bytes += txn->get_bytes(); |
| stats.get_counter += txn->num_get_keys(); |
| // the txn is not a local variable, if not reset will count last res twice |
| txn.reset(nullptr); |
| }; |
| |
| auto tablet_stats = response->add_tablet_stats(); |
| if (!is_versioned_read) { |
| if (!(/* idx.has_db_id() && */ idx.has_table_id() && idx.has_index_id() && |
| idx.has_partition_id() && i.has_tablet_id())) { |
| get_tablet_idx(code, msg, txn.get(), instance_id, idx.tablet_id(), idx); |
| if (code != MetaServiceCode::OK) return; |
| } |
| internal_get_tablet_stats(code, msg, txn.get(), instance_id, idx, *tablet_stats, true); |
| if (code != MetaServiceCode::OK) { |
| response->clear_tablet_stats(); |
| break; |
| } |
| } else { |
| TxnErrorCode err = reader.get_tablet_merged_stats(txn.get(), idx.tablet_id(), |
| tablet_stats, nullptr); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to get versioned tablet stats, err={}, tablet_id={}", err, |
| idx.tablet_id()); |
| LOG(WARNING) << msg; |
| response->clear_tablet_stats(); |
| return; |
| } |
| } |
| #ifdef NDEBUG |
| // Force data size >= 0 to reduce the losses caused by bugs |
| if (tablet_stats->data_size() < 0) tablet_stats->set_data_size(0); |
| if (tablet_stats->index_size() < 0) tablet_stats->set_index_size(0); |
| if (tablet_stats->segment_size() < 0) tablet_stats->set_segment_size(0); |
| #endif |
| } |
| } |
| |
| static bool check_delete_bitmap_lock(MetaServiceCode& code, std::string& msg, std::stringstream& ss, |
| std::unique_ptr<Transaction>& txn, std::string& instance_id, |
| int64_t table_id, int64_t lock_id, int64_t lock_initiator, |
| std::string& lock_key, DeleteBitmapUpdateLockPB& lock_info, |
| std::string use_version, std::string log = "") { |
| std::string lock_val; |
| LOG(INFO) << "check_delete_bitmap_lock, table_id=" << table_id << " lock_id=" << lock_id |
| << " initiator=" << lock_initiator << " key=" << hex(lock_key) << log |
| << " use_version=" << use_version; |
| auto err = txn->get(lock_key, &lock_val); |
| TEST_SYNC_POINT_CALLBACK("check_delete_bitmap_lock.inject_get_lock_key_err", &err); |
| if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| msg = "lock id key not found"; |
| code = MetaServiceCode::LOCK_EXPIRED; |
| return false; |
| } |
| if (err != TxnErrorCode::TXN_OK) { |
| ss << "failed to get delete bitmap lock info, err=" << err; |
| msg = ss.str(); |
| code = cast_as<ErrCategory::READ>(err); |
| return false; |
| } |
| if (!lock_info.ParseFromString(lock_val)) [[unlikely]] { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = "failed to parse DeleteBitmapUpdateLockPB"; |
| return false; |
| } |
| TEST_SYNC_POINT_CALLBACK("check_delete_bitmap_lock.set_lock_info", &lock_info); |
| if (lock_info.lock_id() != lock_id) { |
| ss << "lock id not match, locked by lock_id=" << lock_info.lock_id(); |
| msg = ss.str(); |
| code = MetaServiceCode::LOCK_EXPIRED; |
| return false; |
| } |
| if (use_version == "v2" && is_job_delete_bitmap_lock_id(lock_id)) { |
| std::string tablet_job_key = mow_tablet_job_key({instance_id, table_id, lock_initiator}); |
| std::string tablet_job_val; |
| err = txn->get(tablet_job_key, &tablet_job_val); |
| if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| ss << "tablet job key not found, table_id=" << table_id << " lock_id=" << lock_id |
| << " initiator=" << lock_initiator; |
| msg = ss.str(); |
| code = MetaServiceCode::LOCK_EXPIRED; |
| return false; |
| } |
| if (err != TxnErrorCode::TXN_OK) { |
| ss << "failed to get mow tablet job info, err=" << err; |
| msg = ss.str(); |
| code = cast_as<ErrCategory::READ>(err); |
| return false; |
| } |
| // not check expired time |
| return true; |
| } else { |
| bool found = false; |
| for (auto initiator : lock_info.initiators()) { |
| if (lock_initiator == initiator) { |
| found = true; |
| break; |
| } |
| } |
| if (!found) { |
| msg = "lock initiator not exist"; |
| code = MetaServiceCode::LOCK_EXPIRED; |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| void MetaServiceImpl::get_delete_bitmap_lock_version(std::string& use_version, |
| std::string& instance_id) { |
| use_version = delete_bitmap_lock_white_list_->get_delete_bitmap_lock_version(instance_id); |
| } |
| |
| static bool remove_pending_delete_bitmap(MetaServiceCode& code, std::string& msg, |
| std::stringstream& ss, std::unique_ptr<Transaction>& txn, |
| std::string& instance_id, int64_t tablet_id) { |
| std::string pending_key = meta_pending_delete_bitmap_key({instance_id, tablet_id}); |
| std::string pending_val; |
| auto err = txn->get(pending_key, &pending_val); |
| if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| ss << "failed to get delete bitmap pending info, instance_id=" << instance_id |
| << " tablet_id=" << tablet_id << " key=" << hex(pending_key) << " err=" << err; |
| msg = ss.str(); |
| code = cast_as<ErrCategory::READ>(err); |
| return false; |
| } |
| |
| if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| return true; |
| } |
| |
| // delete delete bitmap of expired txn |
| PendingDeleteBitmapPB pending_info; |
| if (!pending_info.ParseFromString(pending_val)) [[unlikely]] { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = "failed to parse PendingDeleteBitmapPB"; |
| return false; |
| } |
| for (auto& delete_bitmap_key : pending_info.delete_bitmap_keys()) { |
| // FIXME: Don't expose the implementation details of splitting large value |
| // remove large value (>90*1000) |
| std::string end_key = delete_bitmap_key; |
| encode_int64(INT64_MAX, &end_key); |
| txn->remove(delete_bitmap_key, end_key); |
| LOG(INFO) << "xxx remove pending delete bitmap, delete_bitmap_key=" |
| << hex(delete_bitmap_key); |
| } |
| return true; |
| } |
| |
| // When a load txn retries in publish phase with different version to publish, it will gain delete bitmap lock |
| // many times. these locks are *different*, but they are the same in the current implementation because they have |
| // the same lock_id and initiator and don't have version info. If some delete bitmap calculation task with version X |
| // on BE lasts long and try to update delete bitmaps on MS when the txn gains the lock in later retries |
| // with version Y(Y > X) to publish. It may wrongly update version X's delete bitmaps because the lock don't have version info. |
| // |
| // This function checks whether the partition version is correct when updating the delete bitmap |
| // to avoid wrongly update an visible version's delete bitmaps. |
| // 1. get the db id with txn id |
| // 2. get the partition version with db id, table id and partition id |
| // 3. check if the partition version matches the updating version |
| static bool check_partition_version_when_update_delete_bitmap( |
| MetaServiceCode& code, std::string& msg, std::unique_ptr<Transaction>& txn, |
| std::string& instance_id, int64_t table_id, int64_t partition_id, int64_t tablet_id, |
| int64_t txn_id, int64_t next_visible_version, bool is_versioned_read, |
| ResourceManager* resource_mgr) { |
| if (partition_id <= 0) { |
| LOG(WARNING) << fmt::format( |
| "invalid partition_id, skip to check partition version. txn={}, " |
| "table_id={}, partition_id={}, tablet_id={}", |
| txn_id, table_id, partition_id, tablet_id); |
| return true; |
| } |
| // Get db id with txn id |
| std::string index_val; |
| const std::string index_key = txn_index_key({instance_id, txn_id}); |
| auto err = txn->get(index_key, &index_val); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to get db id, txn_id={} err={}", txn_id, err); |
| LOG(WARNING) << msg; |
| return false; |
| } |
| |
| TxnIndexPB index_pb; |
| if (!index_pb.ParseFromString(index_val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = fmt::format("failed to parse txn_index_pb, txn_id={}", txn_id); |
| LOG(WARNING) << msg; |
| return false; |
| } |
| |
| DCHECK(index_pb.has_tablet_index()) |
| << fmt::format("txn={}, table_id={}, partition_id={}, tablet_id={}, index_pb={}", |
| txn_id, table_id, partition_id, tablet_id, proto_to_json(index_pb)); |
| DCHECK(index_pb.tablet_index().has_db_id()) |
| << fmt::format("txn={}, table_id={}, partition_id={}, tablet_id={}, index_pb={}", |
| txn_id, table_id, partition_id, tablet_id, proto_to_json(index_pb)); |
| if (!index_pb.has_tablet_index() || !index_pb.tablet_index().has_db_id()) { |
| LOG(WARNING) << fmt::format( |
| "has no db_id in TxnIndexPB, skip to check partition version. txn={}, " |
| "table_id={}, partition_id={}, tablet_id={}, index_pb={}", |
| txn_id, table_id, partition_id, tablet_id, proto_to_json(index_pb)); |
| return true; |
| } |
| int64_t cur_max_version {-1}; |
| if (!is_versioned_read) { |
| int64_t db_id = index_pb.tablet_index().db_id(); |
| std::string ver_key = partition_version_key({instance_id, db_id, table_id, partition_id}); |
| std::string ver_val; |
| err = txn->get(ver_key, &ver_val); |
| if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| code = cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to get partition version, txn_id={}, tablet={}, err={}", |
| txn_id, tablet_id, err); |
| LOG(WARNING) << msg; |
| return false; |
| } |
| |
| if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| cur_max_version = 1; |
| } else { |
| VersionPB version_pb; |
| if (!version_pb.ParseFromString(ver_val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = fmt::format("failed to parse version_pb, txn_id={}, tablet={}, key={}", |
| txn_id, tablet_id, hex(ver_key)); |
| LOG(WARNING) << msg; |
| return false; |
| } |
| DCHECK(version_pb.has_version()); |
| cur_max_version = version_pb.version(); |
| |
| if (version_pb.pending_txn_ids_size() > 0) { |
| DCHECK(version_pb.pending_txn_ids_size() == 1); |
| cur_max_version += version_pb.pending_txn_ids_size(); |
| } |
| } |
| } else { |
| CloneChainReader reader(instance_id, resource_mgr); |
| VersionPB version_pb; |
| TxnErrorCode err = |
| reader.get_partition_version(txn.get(), partition_id, &version_pb, nullptr); |
| if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| cur_max_version = 1; // no version, set to 1 |
| } else if (err == TxnErrorCode::TXN_OK) { |
| cur_max_version = version_pb.version(); |
| if (version_pb.pending_txn_ids_size() > 0) { |
| DCHECK(version_pb.pending_txn_ids_size() == 1); |
| cur_max_version += version_pb.pending_txn_ids_size(); |
| } |
| } else { |
| code = cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to get versioned partition version, err={}, partition_id={}", |
| err, partition_id); |
| LOG(WARNING) << msg; |
| return false; |
| } |
| } |
| |
| if (cur_max_version + 1 != next_visible_version) { |
| code = MetaServiceCode::VERSION_NOT_MATCH; |
| msg = fmt::format( |
| "check version failed when update_delete_bitmap, txn={}, table_id={}, " |
| "partition_id={}, tablet_id={}, found partition's max version is {}, but " |
| "request next_visible_version is {}", |
| txn_id, table_id, partition_id, tablet_id, cur_max_version, next_visible_version); |
| return false; |
| } |
| return true; |
| } |
| |
| // return false if the delete bitmap key already exists |
| [[maybe_unused]] static bool check_delete_bitmap_kv_exists(MetaServiceCode& code, std::string& msg, |
| std::unique_ptr<Transaction>& txn, |
| const std::string& key, |
| std::string& instance_id, |
| int64_t tablet_id, int64_t lock_id) { |
| std::string start_key {key}; |
| std::string end_key {start_key}; |
| encode_int64(INT64_MAX, &end_key); |
| std::unique_ptr<RangeGetIterator> it; |
| TxnErrorCode err = txn->get(start_key, end_key, &it); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to get delete bitmap, err={}, tablet_id={}, lock_id={}", err, |
| tablet_id, lock_id); |
| return true; // skip to check if failed to get delete bitmap |
| } |
| if (it->has_next()) { |
| auto [k, v] = it->next(); |
| std::string_view k1 = k; |
| k1.remove_prefix(1); |
| std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out; |
| decode_key(&k1, &out); |
| // 0x01 "meta" ${instance_id} "delete_bitmap" ${tablet_id} ${rowset_id} ${version} ${segment_id} -> roaringbitmap |
| auto rowset_id = std::get<std::string>(std::get<0>(out[4])); |
| auto version = std::get<std::int64_t>(std::get<0>(out[5])); |
| auto segment_id = std::get<std::int64_t>(std::get<0>(out[6])); |
| |
| code = MetaServiceCode::UPDATE_OVERRIDE_EXISTING_KV; |
| msg = fmt::format( |
| "trying to update a existing delete bitmap KV, tablet_id={}, rowset_id={}, " |
| "version={}, segment_id={}, lock_id={}, instance_id={}", |
| tablet_id, rowset_id, version, segment_id, lock_id, instance_id); |
| LOG_WARNING(msg); |
| return false; |
| } |
| return true; |
| } |
| |
| struct UpdateDeleteBitmapTxnStats { |
| size_t current_key_count = 0; |
| size_t current_value_count = 0; |
| size_t total_key_count = 0; |
| size_t total_value_count = 0; |
| size_t total_txn_put_keys = 0; |
| size_t total_txn_put_bytes = 0; |
| size_t total_txn_size = 0; |
| size_t total_txn_count = 0; |
| }; |
| |
| void _write_delete_bitmap_kvs(MetaServiceCode& code, std::string& msg, std::stringstream& ss, |
| std::shared_ptr<TxnKv> txn_kv, std::unique_ptr<Transaction>& txn, |
| std::string& use_version, std::set<std::string>& non_exist_rowset_ids, |
| size_t i, std::string& log, KVStats& stats, |
| const UpdateDeleteBitmapRequest* request, std::string& instance_id, |
| std::string& key, const std::string& val, |
| UpdateDeleteBitmapTxnStats& txn_stats, bool is_versioned_read, |
| ResourceManager* resource_mgr) { |
| auto table_id = request->table_id(); |
| auto tablet_id = request->tablet_id(); |
| bool without_lock = request->has_without_lock() ? request->without_lock() : false; |
| // Split into multiple fdb transactions, because the size of one fdb |
| // transaction can't exceed 10MB. |
| if (txn->approximate_bytes() + key.size() * 3 + val.size() > config::max_txn_commit_byte) { |
| LOG(INFO) << "fdb txn size more than " << config::max_txn_commit_byte |
| << ", current size: " << txn->approximate_bytes() << ", tablet_id: " << tablet_id |
| << " lock_id=" << request->lock_id() << " initiator=" << request->initiator() |
| << ", need to commit"; |
| TxnErrorCode err = txn->commit(); |
| TEST_SYNC_POINT_CALLBACK("update_delete_bitmap:commit:err", request->initiator(), i, &err); |
| txn_stats.total_txn_put_keys += txn->num_put_keys(); |
| txn_stats.total_txn_put_bytes += txn->put_bytes(); |
| txn_stats.total_txn_size += txn->approximate_bytes(); |
| txn_stats.total_txn_count++; |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::COMMIT>(err); |
| ss << "failed to update delete bitmap, err=" << err << " tablet_id=" << tablet_id |
| << " lock_id=" << request->lock_id() << " initiator=" << request->initiator() |
| << " delete_bitmap_key=" << txn_stats.current_key_count |
| << " delete_bitmap_value=" << txn_stats.current_value_count |
| << " put_size=" << txn->put_bytes() << " num_put_keys=" << txn->num_put_keys() |
| << " txn_size=" << txn->approximate_bytes(); |
| msg = ss.str(); |
| g_bvar_update_delete_bitmap_fail_counter << 1; |
| return; |
| } |
| stats.get_bytes += txn->get_bytes(); |
| stats.put_bytes += txn->put_bytes(); |
| stats.del_bytes += txn->delete_bytes(); |
| stats.get_counter += txn->num_get_keys(); |
| stats.put_counter += txn->num_put_keys(); |
| stats.del_counter += txn->num_del_keys(); |
| txn_stats.current_key_count = 0; |
| txn_stats.current_value_count = 0; |
| err = txn_kv->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| msg = "failed to init txn"; |
| return; |
| } |
| if (!without_lock) { |
| std::string lock_key = meta_delete_bitmap_update_lock_key({instance_id, table_id, -1}); |
| DeleteBitmapUpdateLockPB lock_info; |
| if (!check_delete_bitmap_lock(code, msg, ss, txn, instance_id, table_id, |
| request->lock_id(), request->initiator(), lock_key, |
| lock_info, use_version, log)) { |
| LOG(WARNING) << "failed to check delete bitmap lock, table_id=" << table_id |
| << " request lock_id=" << request->lock_id() |
| << " request initiator=" << request->initiator() << " msg " << msg; |
| return; |
| } |
| } |
| } |
| |
| #ifdef ENABLE_INJECTION_POINT |
| if (config::enable_update_delete_bitmap_kv_check) { |
| if (!check_delete_bitmap_kv_exists(code, msg, txn, key, instance_id, tablet_id, |
| request->lock_id())) { |
| return; |
| } |
| } |
| #endif |
| |
| if (without_lock && request->has_pre_rowset_agg_end_version() && |
| request->pre_rowset_agg_end_version() > 0) { |
| // check the rowset exists |
| if (non_exist_rowset_ids.contains(request->rowset_ids(i))) { |
| LOG(INFO) << "skip update delete bitmap, rowset_id=" << request->rowset_ids(i) |
| << " version=" << request->pre_rowset_versions(i) |
| << " tablet_id=" << tablet_id << " because the rowset does not exist"; |
| return; |
| } |
| |
| TxnErrorCode err = TxnErrorCode::TXN_OK; |
| doris::RowsetMetaCloudPB rs; |
| if (!is_versioned_read) { |
| auto rowset_key = |
| meta_rowset_key({instance_id, tablet_id, request->pre_rowset_versions(i)}); |
| std::string rowset_val; |
| err = txn->get(rowset_key, &rowset_val); |
| if (err != TxnErrorCode::TXN_OK && TxnErrorCode::TXN_KEY_NOT_FOUND != err) { |
| ss << "failed to get rowset, instance_id=" << instance_id |
| << " tablet_id=" << tablet_id << " version=" << request->pre_rowset_versions(i) |
| << " err=" << err; |
| msg = ss.str(); |
| code = cast_as<ErrCategory::READ>(err); |
| return; |
| } |
| if (err == TxnErrorCode::TXN_OK && |
| !rs.ParseFromArray(rowset_val.data(), rowset_val.size())) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| ss << "malformed rowset meta, unable to deserialize, tablet_id=" << tablet_id |
| << " key=" << hex(rowset_key); |
| msg = ss.str(); |
| return; |
| } |
| } else { |
| CloneChainReader reader(instance_id, resource_mgr); |
| err = reader.get_rowset_meta(txn.get(), tablet_id, request->pre_rowset_versions(i), |
| &rs); |
| if (err != TxnErrorCode::TXN_OK && TxnErrorCode::TXN_KEY_NOT_FOUND != err) { |
| ss << "failed to get versioned rowset, err=" << err << " tablet_id=" << tablet_id |
| << " version=" << request->pre_rowset_versions(i); |
| msg = ss.str(); |
| code = cast_as<ErrCategory::READ>(err); |
| return; |
| } |
| } |
| if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| non_exist_rowset_ids.emplace(request->rowset_ids(i)); |
| LOG(INFO) << "skip update delete bitmap, rowset_id=" << request->rowset_ids(i) |
| << " version=" << request->pre_rowset_versions(i) |
| << " tablet_id=" << tablet_id << " because the rowset is not exist"; |
| return; |
| } |
| if (rs.rowset_id_v2() != request->rowset_ids(i)) { |
| LOG(INFO) << "skip update delete bitmap, rowset_id=" << request->rowset_ids(i) |
| << " version=" << request->pre_rowset_versions(i) |
| << " tablet_id=" << tablet_id << " because the rowset is not exist"; |
| non_exist_rowset_ids.emplace(request->rowset_ids(i)); |
| return; |
| } |
| } |
| // remove first |
| if (request->lock_id() == COMPACTION_WITHOUT_LOCK_DELETE_BITMAP_LOCK_ID) { |
| auto& start_key = key; |
| std::string end_key {start_key}; |
| encode_int64(INT64_MAX, &end_key); |
| txn->remove(start_key, end_key); |
| LOG(INFO) << "xxx remove delete_bitmap_key=" << hex(start_key) << " tablet_id=" << tablet_id |
| << " lock_id=" << request->lock_id() << " initiator=" << request->initiator(); |
| } |
| // splitting large values (>90*1000) into multiple KVs |
| cloud::blob_put(txn.get(), key, val, 0); |
| txn_stats.current_key_count++; |
| txn_stats.current_value_count += val.size(); |
| txn_stats.total_key_count++; |
| txn_stats.total_value_count += val.size(); |
| VLOG_DEBUG << "xxx update delete bitmap put delete_bitmap_key=" << hex(key) |
| << " lock_id=" << request->lock_id() << " initiator=" << request->initiator() |
| << " key_size: " << key.size() << " value_size: " << val.size(); |
| } |
| |
| void MetaServiceImpl::update_delete_bitmap(google::protobuf::RpcController* controller, |
| const UpdateDeleteBitmapRequest* request, |
| UpdateDeleteBitmapResponse* response, |
| ::google::protobuf::Closure* done) { |
| RPC_PREPROCESS(update_delete_bitmap, get, put, del); |
| std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : ""; |
| if (cloud_unique_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "cloud unique id not set"; |
| return; |
| } |
| |
| instance_id = get_instance_id(resource_mgr_, cloud_unique_id); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "empty instance_id"; |
| LOG(WARNING) << msg << ", cloud_unique_id=" << request->cloud_unique_id(); |
| return; |
| } |
| bool without_lock = request->has_without_lock() ? request->without_lock() : false; |
| if (without_lock && request->has_pre_rowset_agg_end_version() && |
| request->pre_rowset_agg_end_version() > 0) { |
| if (request->rowset_ids_size() != request->pre_rowset_versions_size()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| ss << "pre rowset version size=" << request->pre_rowset_versions_size() |
| << " not equal to rowset size=" << request->rowset_ids_size(); |
| msg = ss.str(); |
| return; |
| } |
| } |
| |
| if (request->delta_rowset_ids_size() != request->delete_bitmap_storages_size()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| ss << "delta_rowset_ids size=" << request->delta_rowset_ids_size() |
| << " not equal to delete_bitmap_storages size=" |
| << request->delete_bitmap_storages_size(); |
| msg = ss.str(); |
| return; |
| } |
| |
| auto store_version = request->has_store_version() ? request->store_version() : 1; |
| if (store_version != 1 && store_version != 2 && store_version != 3) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "delete bitmap store version must be 1, 2, 3"; |
| return; |
| } |
| |
| bool is_versioned_write = is_version_write_enabled(instance_id); |
| if (!without_lock && is_versioned_write && store_version == 1) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "instance enabled versioned write, please set delete_bitmap_store_write_version to 2 " |
| "or 3 in be.conf"; |
| return; |
| } |
| |
| std::string use_version = |
| delete_bitmap_lock_white_list_->get_delete_bitmap_lock_version(instance_id); |
| RPC_RATE_LIMIT(update_delete_bitmap) |
| |
| uint64_t fdb_txn_size = 0; |
| auto table_id = request->table_id(); |
| auto tablet_id = request->tablet_id(); |
| |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| msg = "failed to init txn"; |
| return; |
| } |
| |
| bool is_explicit_txn = (request->has_is_explicit_txn() && request->is_explicit_txn()); |
| bool is_first_sub_txn = (is_explicit_txn && request->txn_id() == request->lock_id()); |
| std::string log = ", update delete bitmap for tablet " + std::to_string(tablet_id); |
| if (!without_lock) { |
| // 1. Check whether the lock expires |
| std::string lock_key = meta_delete_bitmap_update_lock_key({instance_id, table_id, -1}); |
| DeleteBitmapUpdateLockPB lock_info; |
| if (!check_delete_bitmap_lock(code, msg, ss, txn, instance_id, table_id, request->lock_id(), |
| request->initiator(), lock_key, lock_info, use_version, |
| log)) { |
| LOG(WARNING) << "failed to check delete bitmap lock, table_id=" << table_id |
| << " request lock_id=" << request->lock_id() |
| << " request initiator=" << request->initiator() << " msg " << msg; |
| return; |
| } |
| // 2. Process pending delete bitmap |
| |
| // if this is a txn load and is not the first sub txn, we should not remove |
| // the pending delete bitmaps written by previous sub txns |
| if (!is_explicit_txn || is_first_sub_txn) { |
| if (!remove_pending_delete_bitmap(code, msg, ss, txn, instance_id, tablet_id)) { |
| return; |
| } |
| } |
| } |
| |
| // 3. check if partition's version matches |
| if (request->lock_id() > 0 && request->has_txn_id() && request->partition_id() && |
| request->has_next_visible_version()) { |
| bool is_versioned_read = is_version_read_enabled(instance_id); |
| if (!check_partition_version_when_update_delete_bitmap( |
| code, msg, txn, instance_id, table_id, request->partition_id(), tablet_id, |
| request->txn_id(), request->next_visible_version(), is_versioned_read, |
| resource_mgr_.get())) { |
| return; |
| } |
| } |
| |
| // 4. store all pending delete bitmap for this txn |
| std::vector<std::string> delete_bitmap_keys_v1; |
| std::vector<std::string> delete_bitmap_keys_v2; |
| for (size_t i = 0; i < request->rowset_ids_size(); ++i) { |
| std::string key; |
| MetaDeleteBitmapInfo key_info {instance_id, tablet_id, request->rowset_ids(i), |
| request->versions(i), request->segment_ids(i)}; |
| meta_delete_bitmap_key(key_info, &key); |
| delete_bitmap_keys_v1.emplace_back(key); |
| } |
| for (size_t i = 0; i < request->delta_rowset_ids_size(); ++i) { |
| std::string key; |
| versioned::MetaDeleteBitmapInfo key_info {instance_id, tablet_id, |
| request->delta_rowset_ids(i)}; |
| versioned::meta_delete_bitmap_key(key_info, &key); |
| delete_bitmap_keys_v2.emplace_back(key); |
| } |
| |
| PendingDeleteBitmapPB previous_pending_info; |
| if (is_explicit_txn && !is_first_sub_txn) { |
| std::string pending_key = meta_pending_delete_bitmap_key({instance_id, tablet_id}); |
| std::string pending_val; |
| auto err = txn->get(pending_key, &pending_val); |
| if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| ss << "failed to get delete bitmap pending info, instance_id=" << instance_id |
| << " tablet_id=" << tablet_id << " key=" << hex(pending_key) << " err=" << err; |
| msg = ss.str(); |
| code = cast_as<ErrCategory::READ>(err); |
| return; |
| } |
| |
| if (err == TxnErrorCode::TXN_OK) { |
| // pending delete bitmaps of previous sub txns |
| if (!previous_pending_info.ParseFromString(pending_val)) [[unlikely]] { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = "failed to parse PendingDeleteBitmapPB"; |
| return; |
| } |
| } |
| } |
| |
| // no need to record pending key for compaction, |
| // because delete bitmap will attach to new rowset, just delete new rowset if failed |
| // lock_id > 0 : load |
| // lock_id = -1 : compaction |
| // lock_id = -2 : schema change |
| // lock_id = -3 : compaction update delete bitmap without lock |
| if (request->lock_id() > 0 || request->lock_id() == -2) { |
| PendingDeleteBitmapPB delete_bitmap_keys; |
| delete_bitmap_keys.mutable_delete_bitmap_keys()->Add(delete_bitmap_keys_v1.begin(), |
| delete_bitmap_keys_v1.end()); |
| delete_bitmap_keys.mutable_delete_bitmap_keys()->Add(delete_bitmap_keys_v2.begin(), |
| delete_bitmap_keys_v2.end()); |
| std::string pending_val; |
| if (is_explicit_txn && !is_first_sub_txn) { |
| // put current delete bitmap keys and previous sub txns' into tablet's pending delete bitmap KV |
| PendingDeleteBitmapPB total_pending_info {std::move(previous_pending_info)}; |
| auto* cur_pending_keys = delete_bitmap_keys.mutable_delete_bitmap_keys(); |
| total_pending_info.mutable_delete_bitmap_keys()->Add(cur_pending_keys->begin(), |
| cur_pending_keys->end()); |
| if (!total_pending_info.SerializeToString(&pending_val)) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| msg = "failed to serialize pending delete bitmap"; |
| return; |
| } |
| } else { |
| if (!delete_bitmap_keys.SerializeToString(&pending_val)) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| msg = "failed to serialize pending delete bitmap"; |
| return; |
| } |
| } |
| std::string pending_key = meta_pending_delete_bitmap_key({instance_id, tablet_id}); |
| txn->put(pending_key, pending_val); |
| fdb_txn_size = fdb_txn_size + pending_key.size() + pending_val.size(); |
| LOG(INFO) << "xxx update delete bitmap put pending_key=" << hex(pending_key) |
| << " lock_id=" << request->lock_id() << " initiator=" << request->initiator() |
| << " value_size: " << pending_val.size(); |
| } |
| |
| // 5. Update delete bitmap for current txn |
| UpdateDeleteBitmapTxnStats txn_stats; |
| std::set<std::string> non_exist_rowset_ids; |
| // write v1 kvs |
| for (size_t i = 0; i < delete_bitmap_keys_v1.size(); ++i) { |
| auto& key = delete_bitmap_keys_v1[i]; |
| auto& val = request->segment_delete_bitmaps(i); |
| bool is_versioned_read = is_version_read_enabled(instance_id); |
| _write_delete_bitmap_kvs(code, msg, ss, txn_kv_, txn, use_version, non_exist_rowset_ids, i, |
| log, stats, request, instance_id, key, val, txn_stats, |
| is_versioned_read, resource_mgr_.get()); |
| if (code != MetaServiceCode::OK) return; |
| } |
| // write v2 kvs |
| for (size_t i = 0; i < delete_bitmap_keys_v2.size(); ++i) { |
| auto& key = delete_bitmap_keys_v2[i]; |
| std::string val; |
| if (!request->delete_bitmap_storages(i).SerializeToString(&val)) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| msg = "failed to serialize delete bitmap storage"; |
| return; |
| } |
| LOG(INFO) << "put delete bitmap kv, tablet_id=" << tablet_id |
| << " rowset_id=" << request->delta_rowset_ids(i) << ", delete bitmap num=" |
| << request->delete_bitmap_storages(i).delete_bitmap().rowset_ids_size() |
| << ". key=" << hex(key) << ", value_size=" << val.size(); |
| bool is_versioned_read = is_version_read_enabled(instance_id); |
| _write_delete_bitmap_kvs(code, msg, ss, txn_kv_, txn, use_version, non_exist_rowset_ids, i, |
| log, stats, request, instance_id, key, val, txn_stats, |
| is_versioned_read, resource_mgr_.get()); |
| if (code != MetaServiceCode::OK) return; |
| } |
| |
| // remove pre rowset delete bitmap |
| if (request->has_pre_rowset_agg_start_version() && request->has_pre_rowset_agg_end_version() && |
| request->pre_rowset_agg_start_version() < request->pre_rowset_agg_end_version()) { |
| std::string pre_rowset_id = ""; |
| for (size_t i = 0; i < request->rowset_ids_size(); ++i) { |
| if (request->rowset_ids(i) == pre_rowset_id) { |
| continue; |
| } |
| if (non_exist_rowset_ids.contains(request->rowset_ids(i))) { |
| LOG(INFO) << "skip remove pre rowsets delete bitmap, rowset_id=" |
| << request->rowset_ids(i) << " tablet_id=" << tablet_id |
| << " because the rowset does not exist"; |
| continue; |
| } |
| pre_rowset_id = request->rowset_ids(i); |
| auto delete_bitmap_start = |
| meta_delete_bitmap_key({instance_id, tablet_id, request->rowset_ids(i), |
| request->pre_rowset_agg_start_version(), 0}); |
| auto delete_bitmap_end = |
| meta_delete_bitmap_key({instance_id, tablet_id, request->rowset_ids(i), |
| request->pre_rowset_agg_end_version(), 0}); |
| txn->remove(delete_bitmap_start, delete_bitmap_end); |
| LOG(INFO) << "remove pre rowsets delete bitmap, tablet_id=" << tablet_id |
| << ", rowset=" << request->rowset_ids(i) |
| << ", start_version=" << request->pre_rowset_agg_start_version() |
| << ", end_version=" << request->pre_rowset_agg_end_version() |
| << ", start_key=" << hex(delete_bitmap_start) |
| << ", end_key=" << hex(delete_bitmap_end); |
| } |
| } |
| err = txn->commit(); |
| txn_stats.total_txn_put_keys += txn->num_put_keys(); |
| txn_stats.total_txn_put_bytes += txn->put_bytes(); |
| txn_stats.total_txn_size += txn->approximate_bytes(); |
| txn_stats.total_txn_count++; |
| if (err != TxnErrorCode::TXN_OK) { |
| if (err == TxnErrorCode::TXN_CONFLICT) { |
| g_bvar_delete_bitmap_lock_txn_put_conflict_counter << 1; |
| } |
| code = cast_as<ErrCategory::COMMIT>(err); |
| ss << "failed to update delete bitmap, err=" << err << " tablet_id=" << tablet_id |
| << " lock_id=" << request->lock_id() << " initiator=" << request->initiator() |
| << " delete_bitmap_key=" << txn_stats.current_key_count |
| << " delete_bitmap_value=" << txn_stats.current_value_count |
| << " put_size=" << txn->put_bytes() << " num_put_keys=" << txn->num_put_keys() |
| << " txn_size=" << txn->approximate_bytes(); |
| msg = ss.str(); |
| g_bvar_update_delete_bitmap_fail_counter << 1; |
| return; |
| } |
| LOG(INFO) << "update_delete_bitmap tablet_id=" << tablet_id << " lock_id=" << request->lock_id() |
| << " initiator=" << request->initiator() |
| << " v1_rowset_num=" << delete_bitmap_keys_v1.size() |
| << " v2_rowset_num=" << delete_bitmap_keys_v2.size() |
| << " total_key_count=" << txn_stats.total_key_count |
| << " total_value_count=" << txn_stats.total_value_count |
| << " without_lock=" << without_lock |
| << " total_txn_put_keys=" << txn_stats.total_txn_put_keys |
| << " total_txn_put_bytes=" << txn_stats.total_txn_put_bytes |
| << " total_txn_size=" << txn_stats.total_txn_size |
| << " total_txn_count=" << txn_stats.total_txn_count << " instance_id=" << instance_id |
| << " use_version=" << use_version; |
| } |
| |
| void MetaServiceImpl::get_delete_bitmap(google::protobuf::RpcController* controller, |
| const GetDeleteBitmapRequest* request, |
| GetDeleteBitmapResponse* response, |
| ::google::protobuf::Closure* done) { |
| RPC_PREPROCESS(get_delete_bitmap, get); |
| std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : ""; |
| if (cloud_unique_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "cloud unique id not set"; |
| return; |
| } |
| |
| instance_id = get_instance_id(resource_mgr_, cloud_unique_id); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "empty instance_id"; |
| LOG(WARNING) << msg << ", cloud_unique_id=" << request->cloud_unique_id(); |
| return; |
| } |
| RPC_RATE_LIMIT(get_delete_bitmap) |
| |
| auto tablet_id = request->tablet_id(); |
| auto& rowset_ids = request->rowset_ids(); |
| auto& begin_versions = request->begin_versions(); |
| auto& end_versions = request->end_versions(); |
| auto store_version = request->has_store_version() ? request->store_version() : 1; |
| int64_t dbm_bytes_threshold = request->has_dbm_bytes_threshold() |
| ? request->dbm_bytes_threshold() |
| : std::numeric_limits<int64_t>::max(); |
| if (store_version != 1 && store_version != 2 && store_version != 3) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "delete bitmap store version must be 1, 2, 3"; |
| return; |
| } |
| bool read_v1 = store_version == 1 || store_version == 3; |
| bool read_v2 = store_version == 2 || store_version == 3; |
| if (rowset_ids.size() != begin_versions.size() || rowset_ids.size() != end_versions.size()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| ss << "rowset and version size not match. " |
| << " rowset_size=" << rowset_ids.size() |
| << " begin_version_size=" << begin_versions.size() |
| << " end_version_size=" << end_versions.size(); |
| msg = ss.str(); |
| return; |
| } |
| |
| bool is_versioned_read = is_version_read_enabled(instance_id); |
| if (is_versioned_read && store_version == 1) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "instance enabled versioned read, please set delete_bitmap_store_read_version to 2 " |
| "or 3 in be.conf"; |
| return; |
| } |
| |
| response->set_tablet_id(tablet_id); |
| int64_t delete_bitmap_num = 0; |
| int64_t delete_bitmap_byte = 0; |
| bool test = false; |
| TEST_SYNC_POINT_CALLBACK("get_delete_bitmap_test", &test); |
| |
| auto check_delete_bitmap_bytes_exceed_limit = [&]() { |
| if (delete_bitmap_byte > config::max_get_delete_bitmap_byte) { |
| code = MetaServiceCode::KV_TXN_GET_ERR; |
| ss << "tablet=" << tablet_id << ", get_delete_bitmap_byte=" << delete_bitmap_byte |
| << ",exceed max byte"; |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| g_bvar_get_delete_bitmap_fail_counter << 1; |
| } |
| }; |
| |
| for (size_t i = 0; i < rowset_ids.size(); i++) { |
| // create a new transaction every time, avoid using one transaction that takes too long |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| msg = "failed to init txn"; |
| return; |
| } |
| DORIS_CLOUD_DEFER { |
| if (txn == nullptr) return; |
| stats.get_bytes += txn->get_bytes(); |
| stats.get_counter += txn->num_get_keys(); |
| }; |
| if (read_v1) { |
| MetaDeleteBitmapInfo start_key_info {instance_id, tablet_id, rowset_ids[i], |
| begin_versions[i], 0}; |
| MetaDeleteBitmapInfo end_key_info {instance_id, tablet_id, rowset_ids[i], |
| end_versions[i], INT64_MAX}; |
| std::string start_key; |
| std::string end_key; |
| meta_delete_bitmap_key(start_key_info, &start_key); |
| meta_delete_bitmap_key(end_key_info, &end_key); |
| |
| // in order to get splitted large value |
| encode_int64(INT64_MAX, &end_key); |
| |
| std::unique_ptr<RangeGetIterator> it; |
| int64_t last_ver = -1; |
| int64_t last_seg_id = -1; |
| int64_t round = 0; |
| do { |
| if (test) { |
| LOG(INFO) << "test"; |
| err = txn->get(start_key, end_key, &it, false, 2); |
| } else { |
| err = txn->get(start_key, end_key, &it); |
| } |
| TEST_SYNC_POINT_CALLBACK("get_delete_bitmap_err", &round, &err); |
| int64_t retry = 0; |
| while (err == TxnErrorCode::TXN_TOO_OLD && retry < 3) { |
| stats.get_bytes += txn->get_bytes(); |
| stats.get_counter += txn->num_get_keys(); |
| txn = nullptr; |
| err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| ss << "failed to init txn, retry=" << retry << ", internal round=" << round; |
| msg = ss.str(); |
| return; |
| } |
| if (test) { |
| err = txn->get(start_key, end_key, &it, false, 2); |
| } else { |
| err = txn->get(start_key, end_key, &it); |
| } |
| retry++; |
| LOG(INFO) << "retry get delete bitmap, tablet=" << tablet_id |
| << ", retry=" << retry << ", internal round=" << round |
| << ", delete_bitmap_num=" << delete_bitmap_num |
| << ", delete_bitmap_byte=" << delete_bitmap_byte; |
| } |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| ss << "internal error, failed to get delete bitmap, internal round=" << round |
| << ", ret=" << err; |
| msg = ss.str(); |
| g_bvar_get_delete_bitmap_fail_counter << 1; |
| return; |
| } |
| |
| while (it->has_next()) { |
| auto [k, v] = it->next(); |
| auto k1 = k; |
| k1.remove_prefix(1); |
| std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out; |
| decode_key(&k1, &out); |
| // 0x01 "meta" ${instance_id} "delete_bitmap" ${tablet_id} |
| // ${rowset_id0} ${version1} ${segment_id0} -> DeleteBitmapPB |
| auto ver = std::get<int64_t>(std::get<0>(out[5])); |
| auto seg_id = std::get<int64_t>(std::get<0>(out[6])); |
| |
| // FIXME: Don't expose the implementation details of splitting large value. |
| // merge splitted large values (>90*1000) |
| if (ver != last_ver || seg_id != last_seg_id) { |
| response->add_rowset_ids(rowset_ids[i]); |
| response->add_segment_ids(seg_id); |
| response->add_versions(ver); |
| response->add_segment_delete_bitmaps(std::string(v)); |
| last_ver = ver; |
| last_seg_id = seg_id; |
| delete_bitmap_num++; |
| delete_bitmap_byte += v.length(); |
| } else { |
| TEST_SYNC_POINT_CALLBACK("get_delete_bitmap_code", &code); |
| if (code != MetaServiceCode::OK) { |
| ss << "test get get_delete_bitmap fail, code=" |
| << MetaServiceCode_Name(code) << ", internal round=" << round; |
| msg = ss.str(); |
| return; |
| } |
| delete_bitmap_byte += v.length(); |
| response->mutable_segment_delete_bitmaps()->rbegin()->append(v); |
| } |
| } |
| check_delete_bitmap_bytes_exceed_limit(); |
| if (code != MetaServiceCode::OK) return; |
| round++; |
| start_key = it->next_begin_key(); // Update to next smallest key for iteration |
| } while (it->more()); |
| LOG(INFO) << "get delete bitmap for tablet=" << tablet_id |
| << ", rowset=" << rowset_ids[i] << ", start version=" << begin_versions[i] |
| << ", end version=" << end_versions[i] << ", internal round=" << round |
| << ", delete_bitmap_num=" << delete_bitmap_num |
| << ", delete_bitmap_byte=" << delete_bitmap_byte; |
| } |
| if (read_v2 && begin_versions[i] == 0) { |
| check_delete_bitmap_bytes_exceed_limit(); |
| if (code != MetaServiceCode::OK) return; |
| |
| DeleteBitmapStoragePB delete_bitmap_storage; |
| CloneChainReader reader(instance_id, resource_mgr_.get()); |
| err = reader.get_delete_bitmap_v2(txn.get(), tablet_id, rowset_ids[i], |
| &delete_bitmap_storage); |
| |
| int64_t retry = 0; |
| TEST_SYNC_POINT_CALLBACK("get_delete_bitmap_err", &retry, &err); |
| while (err == TxnErrorCode::TXN_TOO_OLD && retry < 3) { |
| stats.get_bytes += txn->get_bytes(); |
| stats.get_counter += txn->num_get_keys(); |
| txn = nullptr; |
| err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| ss << "failed to init txn, retry=" << retry; |
| msg = ss.str(); |
| return; |
| } |
| err = reader.get_delete_bitmap_v2(txn.get(), tablet_id, rowset_ids[i], |
| &delete_bitmap_storage); |
| retry++; |
| LOG(INFO) << "retry get delete bitmap, tablet=" << tablet_id << ", retry=" << retry |
| << ", delete_bitmap_num=" << delete_bitmap_num |
| << ", delete_bitmap_byte=" << delete_bitmap_byte; |
| TEST_SYNC_POINT_CALLBACK("get_delete_bitmap_err", &retry, &err); |
| } |
| |
| if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| LOG(INFO) << "delete bitmap not found for tablet=" << tablet_id |
| << ", rowset=" << rowset_ids[i]; |
| continue; |
| } else if (err == TxnErrorCode::TXN_INVALID_DATA) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| ss << "failed to parse DeleteBitmapStoragePB for tablet=" << tablet_id |
| << ", rowset=" << rowset_ids[i]; |
| msg = ss.str(); |
| return; |
| } else if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| ss << "failed to get delete bitmap, ret=" << err; |
| msg = ss.str(); |
| return; |
| } |
| response->add_delta_rowset_ids(rowset_ids[i]); |
| response->add_delete_bitmap_storages()->CopyFrom(delete_bitmap_storage); |
| LOG(INFO) << "get delete bitmap for tablet=" << tablet_id |
| << ", rowset=" << rowset_ids[i] |
| << ", store_in_fdb=" << delete_bitmap_storage.store_in_fdb() |
| << ", delete_bitmap_num=" |
| << delete_bitmap_storage.delete_bitmap().rowset_ids_size(); |
| delete_bitmap_num += delete_bitmap_storage.delete_bitmap().rowset_ids_size(); |
| delete_bitmap_byte += delete_bitmap_storage.ByteSizeLong(); |
| } |
| |
| response->add_returned_rowset_ids(rowset_ids[i]); |
| if (delete_bitmap_byte >= dbm_bytes_threshold && i < rowset_ids.size() - 1) { |
| response->set_has_more(true); |
| LOG_INFO("stop in advance to get delete bitmap due to reach bytes threshold") |
| .tag("instance_id", instance_id) |
| .tag("tablet_id", tablet_id) |
| .tag("req_rowset_num", rowset_ids.size()) |
| .tag("finished_rowset_num", response->returned_rowset_ids_size()) |
| .tag("delete_bitmap_num", delete_bitmap_num) |
| .tag("delete_bitmap_byte", delete_bitmap_byte) |
| .tag("bytes_threshold", dbm_bytes_threshold); |
| break; |
| } |
| } |
| LOG(INFO) << "finish get delete bitmap for tablet=" << tablet_id |
| << ", delete_bitmap_num=" << delete_bitmap_num |
| << ", delete_bitmap_byte=" << delete_bitmap_byte; |
| |
| if (request->has_idx()) { |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| msg = "failed to init txn"; |
| return; |
| } |
| DORIS_CLOUD_DEFER { |
| if (txn == nullptr) return; |
| stats.get_bytes += txn->get_bytes(); |
| stats.get_counter += txn->num_get_keys(); |
| }; |
| TabletIndexPB idx(request->idx()); |
| TabletStatsPB tablet_stat; |
| bool is_versioned_read = is_version_read_enabled(instance_id); |
| if (!is_versioned_read) { |
| internal_get_tablet_stats(code, msg, txn.get(), instance_id, idx, tablet_stat, |
| true /*snapshot_read*/); |
| if (code != MetaServiceCode::OK) { |
| return; |
| } |
| } else { |
| CloneChainReader reader(instance_id, resource_mgr_.get()); |
| TxnErrorCode err = |
| reader.get_tablet_merged_stats(txn.get(), tablet_id, &tablet_stat, nullptr); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to get versioned tablet stats, err={}, tablet_id={}", err, |
| tablet_id); |
| LOG(WARNING) << msg; |
| return; |
| } |
| } |
| // The requested compaction state and the actual compaction state are different, which indicates that |
| // the requested rowsets are expired and their delete bitmap may have been deleted. |
| if (request->base_compaction_cnt() != tablet_stat.base_compaction_cnt() || |
| request->cumulative_compaction_cnt() != tablet_stat.cumulative_compaction_cnt() || |
| request->cumulative_point() != tablet_stat.cumulative_point()) { |
| code = MetaServiceCode::ROWSETS_EXPIRED; |
| msg = "rowsets are expired"; |
| return; |
| } |
| } |
| } |
| |
| static bool put_mow_tablet_job_key(MetaServiceCode& code, std::string& msg, |
| std::unique_ptr<Transaction>& txn, std::string& instance_id, |
| int64_t table_id, int64_t lock_id, int64_t initiator, |
| int64_t expiration, std::string& current_lock_msg) { |
| std::string tablet_job_key = mow_tablet_job_key({instance_id, table_id, initiator}); |
| std::string tablet_job_val; |
| MowTabletJobPB mow_tablet_job; |
| mow_tablet_job.set_expiration(expiration); |
| mow_tablet_job.SerializeToString(&tablet_job_val); |
| if (tablet_job_val.empty()) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| msg = "MowTabletJobPB serialization error"; |
| return false; |
| } |
| txn->put(tablet_job_key, tablet_job_val); |
| LOG(INFO) << "xxx put tablet job key=" << hex(tablet_job_key) << " table_id=" << table_id |
| << " lock_id=" << lock_id << " initiator=" << initiator |
| << " expiration=" << expiration << ", " << current_lock_msg; |
| return true; |
| } |
| |
| static bool put_delete_bitmap_update_lock_key(MetaServiceCode& code, std::string& msg, |
| std::unique_ptr<Transaction>& txn, int64_t table_id, |
| int64_t lock_id, int64_t initiator, |
| std::string& lock_key, |
| DeleteBitmapUpdateLockPB& lock_info, |
| std::string& current_lock_msg) { |
| std::string lock_val; |
| lock_info.SerializeToString(&lock_val); |
| if (lock_val.empty()) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| msg = "DeleteBitmapUpdateLockPB serialization error"; |
| return false; |
| } |
| txn->put(lock_key, lock_val); |
| LOG(INFO) << "xxx put lock_key=" << hex(lock_key) << " table_id=" << table_id |
| << " lock_id=" << lock_id << " initiator=" << initiator |
| << " initiators_size=" << lock_info.initiators_size() << ", " << current_lock_msg; |
| return true; |
| } |
| |
| bool MetaServiceImpl::get_mow_tablet_stats_and_meta(MetaServiceCode& code, std::string& msg, |
| const GetDeleteBitmapUpdateLockRequest* request, |
| GetDeleteBitmapUpdateLockResponse* response, |
| std::string& instance_id, std::string& lock_key, |
| std::string lock_use_version, KVStats& stats) { |
| bool require_tablet_stats = |
| request->has_require_compaction_stats() ? request->require_compaction_stats() : false; |
| if (!require_tablet_stats) return true; |
| // this request is from fe when it commits txn for MOW table, we send the compaction stats |
| // along with the GetDeleteBitmapUpdateLockResponse which will be sent to BE later to let |
| // BE eliminate unnecessary sync_rowsets() calls if possible |
| // 1. hold the delete bitmap update lock in MS(update lock_info.lock_id to current load's txn id) |
| // 2. read tablets' stats |
| // 3. check whether we still hold the delete bitmap update lock |
| // these steps can be done in different fdb txns |
| |
| StopWatch read_stats_sw; |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| msg = "failed to init txn"; |
| return false; |
| } |
| DORIS_CLOUD_DEFER { |
| if (txn == nullptr) return; |
| stats.get_bytes += txn->get_bytes(); |
| stats.put_bytes += txn->put_bytes(); |
| stats.del_bytes += txn->delete_bytes(); |
| stats.get_counter += txn->num_get_keys(); |
| stats.put_counter += txn->num_put_keys(); |
| stats.del_counter += txn->num_del_keys(); |
| }; |
| auto table_id = request->table_id(); |
| std::stringstream ss; |
| bool is_versioned_read = is_version_read_enabled(instance_id); |
| if (!config::enable_batch_get_mow_tablet_stats_and_meta) { |
| CloneChainReader reader(instance_id, txn_kv_.get(), resource_mgr_.get()); |
| for (const auto& tablet_idx : request->tablet_indexes()) { |
| // 1. get compaction cnts |
| TabletStatsPB tablet_stat; |
| if (!is_versioned_read) { |
| std::string stats_key = |
| stats_tablet_key({instance_id, tablet_idx.table_id(), tablet_idx.index_id(), |
| tablet_idx.partition_id(), tablet_idx.tablet_id()}); |
| std::string stats_val; |
| TxnErrorCode err = txn->get(stats_key, &stats_val); |
| TEST_SYNC_POINT_CALLBACK( |
| "get_delete_bitmap_update_lock.get_compaction_cnts_inject_error", &err); |
| if (err == TxnErrorCode::TXN_TOO_OLD) { |
| code = MetaServiceCode::OK; |
| err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| ss << "failed to init txn when get tablet stats"; |
| msg = ss.str(); |
| return false; |
| } |
| err = txn->get(stats_key, &stats_val); |
| } |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to get tablet stats, err={} tablet_id={}", err, |
| tablet_idx.tablet_id()); |
| return false; |
| } |
| if (!tablet_stat.ParseFromArray(stats_val.data(), stats_val.size())) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = fmt::format("marformed tablet stats value, key={}", hex(stats_key)); |
| return false; |
| } |
| } else { |
| err = reader.get_tablet_compact_stats(txn.get(), tablet_idx.tablet_id(), |
| &tablet_stat, nullptr); |
| if (err == TxnErrorCode::TXN_TOO_OLD) { |
| code = MetaServiceCode::OK; |
| err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| ss << "failed to init txn when get tablet stats"; |
| msg = ss.str(); |
| return false; |
| } |
| err = reader.get_tablet_compact_stats(txn.get(), tablet_idx.tablet_id(), |
| &tablet_stat, nullptr); |
| } |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to get tablet compact stats, err={}, tablet_id={}", |
| err, tablet_idx.tablet_id()); |
| LOG(WARNING) << msg; |
| return false; |
| } |
| } |
| response->add_base_compaction_cnts(tablet_stat.base_compaction_cnt()); |
| response->add_cumulative_compaction_cnts(tablet_stat.cumulative_compaction_cnt()); |
| response->add_cumulative_points(tablet_stat.cumulative_point()); |
| |
| // 2. get tablet states |
| doris::TabletMetaCloudPB tablet_meta; |
| if (!is_versioned_read) { |
| std::string tablet_meta_key = |
| meta_tablet_key({instance_id, tablet_idx.table_id(), tablet_idx.index_id(), |
| tablet_idx.partition_id(), tablet_idx.tablet_id()}); |
| std::string tablet_meta_val; |
| err = txn->get(tablet_meta_key, &tablet_meta_val); |
| if (err != TxnErrorCode::TXN_OK) { |
| ss << "failed to get tablet meta" |
| << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? " (not found)" : "") |
| << " instance_id=" << instance_id << " tablet_id=" << tablet_idx.tablet_id() |
| << " key=" << hex(tablet_meta_key) << " err=" << err; |
| msg = ss.str(); |
| code = err == TxnErrorCode::TXN_KEY_NOT_FOUND |
| ? MetaServiceCode::TABLET_NOT_FOUND |
| : cast_as<ErrCategory::READ>(err); |
| return false; |
| } |
| if (!tablet_meta.ParseFromString(tablet_meta_val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = "malformed tablet meta"; |
| return false; |
| } |
| } else { |
| err = reader.get_tablet_meta(txn.get(), tablet_idx.tablet_id(), &tablet_meta, |
| nullptr); |
| if (err != TxnErrorCode::TXN_OK) { |
| ss << "failed to get versioned tablet meta" |
| << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? " (not found)" : "") |
| << " instance_id=" << instance_id << " tablet_id=" << tablet_idx.tablet_id() |
| << " err=" << err; |
| msg = ss.str(); |
| code = err == TxnErrorCode::TXN_KEY_NOT_FOUND |
| ? MetaServiceCode::TABLET_NOT_FOUND |
| : cast_as<ErrCategory::READ>(err); |
| return false; |
| } |
| } |
| response->add_tablet_states( |
| static_cast<std::underlying_type_t<TabletStatePB>>(tablet_meta.tablet_state())); |
| } |
| } else if (!is_versioned_read) { |
| // 1. get compaction cnts |
| std::vector<std::string> stats_tablet_keys; |
| for (const auto& tablet_idx : request->tablet_indexes()) { |
| stats_tablet_keys.push_back( |
| stats_tablet_key({instance_id, tablet_idx.table_id(), tablet_idx.index_id(), |
| tablet_idx.partition_id(), tablet_idx.tablet_id()})); |
| } |
| std::vector<std::optional<std::string>> stats_tablet_values; |
| err = txn->batch_get(&stats_tablet_values, stats_tablet_keys); |
| TEST_SYNC_POINT_CALLBACK("get_delete_bitmap_update_lock.get_compaction_cnts_inject_error", |
| &err); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to get tablet stats, err={} table_id={} lock_id={}", err, |
| table_id, request->lock_id()); |
| return false; |
| } |
| for (size_t i = 0; i < stats_tablet_keys.size(); i++) { |
| if (!stats_tablet_values[i].has_value()) { |
| code = cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to get tablet stats, err={} tablet_id={}", err, |
| request->tablet_indexes(i).tablet_id()); |
| return false; |
| } |
| TabletStatsPB tablet_stat; |
| if (!tablet_stat.ParseFromString(stats_tablet_values[i].value())) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = fmt::format("marformed tablet stats value"); |
| return false; |
| } |
| response->add_base_compaction_cnts(tablet_stat.base_compaction_cnt()); |
| response->add_cumulative_compaction_cnts(tablet_stat.cumulative_compaction_cnt()); |
| response->add_cumulative_points(tablet_stat.cumulative_point()); |
| } |
| stats_tablet_keys.clear(); |
| stats_tablet_values.clear(); |
| DCHECK(request->tablet_indexes_size() == response->base_compaction_cnts_size()); |
| DCHECK(request->tablet_indexes_size() == response->cumulative_compaction_cnts_size()); |
| DCHECK(request->tablet_indexes_size() == response->cumulative_points_size()); |
| |
| // 2. get tablet states |
| std::vector<std::string> tablet_meta_keys; |
| for (const auto& tablet_idx : request->tablet_indexes()) { |
| tablet_meta_keys.push_back( |
| meta_tablet_key({instance_id, tablet_idx.table_id(), tablet_idx.index_id(), |
| tablet_idx.partition_id(), tablet_idx.tablet_id()})); |
| } |
| std::vector<std::optional<std::string>> tablet_meta_values; |
| err = txn->batch_get(&tablet_meta_values, tablet_meta_keys); |
| if (err == TxnErrorCode::TXN_TOO_OLD) { |
| code = MetaServiceCode::OK; |
| err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| ss << "failed to init txn when get tablet meta"; |
| msg = ss.str(); |
| return false; |
| } |
| err = txn->batch_get(&tablet_meta_values, tablet_meta_keys); |
| } |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to get tablet meta, err={} table_id={} lock_id={}", err, |
| table_id, request->lock_id()); |
| return false; |
| } |
| for (size_t i = 0; i < tablet_meta_keys.size(); i++) { |
| if (!tablet_meta_values[i].has_value()) { |
| code = MetaServiceCode::TABLET_NOT_FOUND; |
| msg = fmt::format("failed to get tablet meta (not found), tablet_id={}", |
| request->tablet_indexes(i).tablet_id()); |
| return false; |
| } |
| doris::TabletMetaCloudPB tablet_meta; |
| if (!tablet_meta.ParseFromString(tablet_meta_values[i].value())) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = fmt::format("marformed tablet meta value"); |
| return false; |
| } |
| response->add_tablet_states( |
| static_cast<std::underlying_type_t<TabletStatePB>>(tablet_meta.tablet_state())); |
| } |
| DCHECK(request->tablet_indexes_size() == response->tablet_states_size()); |
| } else { |
| CloneChainReader reader(instance_id, txn_kv_.get(), resource_mgr_.get()); |
| std::vector<int64_t> tablet_ids; |
| tablet_ids.reserve(request->tablet_indexes_size()); |
| for (const auto& tablet_idx : request->tablet_indexes()) { |
| tablet_ids.push_back(tablet_idx.tablet_id()); |
| } |
| |
| // 1. get compaction cnts |
| std::unordered_map<int64_t, TabletStatsPB> tablet_stats; |
| err = reader.get_tablet_compact_stats(txn.get(), tablet_ids, &tablet_stats, nullptr); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to get tablet compact stats, err={} table_id={} lock_id={}", |
| err, table_id, request->lock_id()); |
| return false; |
| } |
| if (tablet_stats.size() < tablet_ids.size()) { |
| code = cast_as<ErrCategory::READ>(err); |
| msg = fmt::format( |
| "failed to get tablet compact stats (not found), err={} table_id={} lock_id={}", |
| err, table_id, request->lock_id()); |
| return false; |
| } |
| for (int64_t tablet_id : tablet_ids) { |
| auto& tablet_stat = tablet_stats[tablet_id]; |
| response->add_base_compaction_cnts(tablet_stat.base_compaction_cnt()); |
| response->add_cumulative_compaction_cnts(tablet_stat.cumulative_compaction_cnt()); |
| response->add_cumulative_points(tablet_stat.cumulative_point()); |
| } |
| |
| // 2. get tablet states |
| for (const auto& tablet_idx : request->tablet_indexes()) { |
| doris::TabletMetaCloudPB tablet_meta; |
| err = reader.get_tablet_meta(txn.get(), tablet_idx.tablet_id(), &tablet_meta, nullptr); |
| if (err != TxnErrorCode::TXN_OK) { |
| ss << "failed to get versioned tablet meta" |
| << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? " (not found)" : "") |
| << " instance_id=" << instance_id << " tablet_id=" << tablet_idx.tablet_id() |
| << " err=" << err; |
| msg = ss.str(); |
| code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TABLET_NOT_FOUND |
| : cast_as<ErrCategory::READ>(err); |
| return false; |
| } |
| response->add_tablet_states( |
| static_cast<std::underlying_type_t<TabletStatePB>>(tablet_meta.tablet_state())); |
| } |
| } |
| |
| read_stats_sw.pause(); |
| LOG(INFO) << fmt::format( |
| "table_id={}, tablet_idxes.size()={}, read tablet compaction cnts and tablet states " |
| "cost={} ms", |
| table_id, request->tablet_indexes().size(), read_stats_sw.elapsed_us() / 1000); |
| |
| DeleteBitmapUpdateLockPB lock_info_tmp; |
| if (!check_delete_bitmap_lock(code, msg, ss, txn, instance_id, table_id, request->lock_id(), |
| request->initiator(), lock_key, lock_info_tmp, |
| lock_use_version)) { |
| LOG(WARNING) << "failed to check delete bitmap lock after get tablet stats and tablet " |
| "states, table_id=" |
| << table_id << " request lock_id=" << request->lock_id() |
| << " request initiator=" << request->initiator() << " code=" << code |
| << " msg=" << msg; |
| return false; |
| } |
| return true; |
| } |
| |
| void MetaServiceImpl::get_delete_bitmap_update_lock_v2( |
| google::protobuf::RpcController* controller, |
| const GetDeleteBitmapUpdateLockRequest* request, |
| GetDeleteBitmapUpdateLockResponse* response, ::google::protobuf::Closure* done, |
| std::string& instance_id, MetaServiceCode& code, std::string& msg, std::stringstream& ss, |
| KVStats& stats) { |
| VLOG_DEBUG << "get delete bitmap update lock in v2 for table=" << request->table_id() |
| << ",lock id=" << request->lock_id() << ",initiator=" << request->initiator(); |
| auto table_id = request->table_id(); |
| bool urgent = request->has_urgent() && request->urgent(); |
| std::string lock_key = meta_delete_bitmap_update_lock_key({instance_id, table_id, -1}); |
| bool first_retry = true; |
| int64_t retry = 0; |
| while (retry <= 1) { |
| retry++; |
| response->Clear(); |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| msg = "failed to init txn"; |
| return; |
| } |
| DORIS_CLOUD_DEFER { |
| if (txn == nullptr) return; |
| stats.get_bytes += txn->get_bytes(); |
| stats.put_bytes += txn->put_bytes(); |
| stats.del_bytes += txn->delete_bytes(); |
| stats.get_counter += txn->num_get_keys(); |
| stats.put_counter += txn->num_put_keys(); |
| stats.del_counter += txn->num_del_keys(); |
| }; |
| std::string lock_val; |
| DeleteBitmapUpdateLockPB lock_info; |
| err = txn->get(lock_key, &lock_val); |
| if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| ss << "failed to get delete bitmap update lock, instance_id=" << instance_id |
| << " table_id=" << table_id << " key=" << hex(lock_key) << " err=" << err; |
| msg = ss.str(); |
| code = MetaServiceCode::KV_TXN_GET_ERR; |
| return; |
| } |
| using namespace std::chrono; |
| int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count(); |
| int64_t expiration = now + request->expiration(); |
| bool lock_key_not_found = false; |
| if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| lock_key_not_found = true; |
| std::string current_lock_msg = "lock key not found"; |
| lock_info.set_lock_id(request->lock_id()); |
| // compaction does not use this expiration, only used when upgrade ms |
| lock_info.set_expiration(expiration); |
| if (!is_job_delete_bitmap_lock_id(request->lock_id())) { |
| lock_info.add_initiators(request->initiator()); |
| } else { |
| // in normal case, this should remove 0 kvs |
| // but when upgrade ms, if there are ms with old and new versions, it works |
| std::string tablet_job_key_begin = mow_tablet_job_key({instance_id, table_id, 0}); |
| std::string tablet_job_key_end = |
| mow_tablet_job_key({instance_id, table_id, INT64_MAX}); |
| txn->remove(tablet_job_key_begin, tablet_job_key_end); |
| LOG(INFO) << "remove mow tablet job kv, begin=" << hex(tablet_job_key_begin) |
| << " end=" << hex(tablet_job_key_end) << " table_id=" << table_id; |
| if (!put_mow_tablet_job_key(code, msg, txn, instance_id, table_id, |
| request->lock_id(), request->initiator(), expiration, |
| current_lock_msg)) { |
| return; |
| } |
| } |
| if (!put_delete_bitmap_update_lock_key(code, msg, txn, table_id, request->lock_id(), |
| request->initiator(), lock_key, lock_info, |
| current_lock_msg)) { |
| return; |
| } |
| } else if (err == TxnErrorCode::TXN_OK) { |
| if (!lock_info.ParseFromString(lock_val)) [[unlikely]] { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = "failed to parse DeleteBitmapUpdateLockPB"; |
| return; |
| } |
| if (urgent) { |
| // since currently only the FE Master initiates the lock request for import tasks, |
| // and it does so in a single-threaded manner, there is no need to check the lock id here |
| DCHECK(request->lock_id() > 0); |
| lock_info.clear_initiators(); |
| std::string key0 = mow_tablet_job_key({instance_id, table_id, 0}); |
| std::string key1 = mow_tablet_job_key( |
| {instance_id, table_id, std::numeric_limits<int64_t>::max()}); |
| txn->remove(key0, key1); |
| LOG(INFO) << "remove mow tablet job kv, begin=" << hex(key0) << " end=" << hex(key1) |
| << " table_id=" << table_id; |
| std::string current_lock_msg = |
| "original lock_id=" + std::to_string(lock_info.lock_id()); |
| lock_info.set_lock_id(request->lock_id()); |
| lock_info.set_expiration(expiration); |
| lock_info.add_initiators(request->initiator()); |
| if (!put_delete_bitmap_update_lock_key(code, msg, txn, table_id, request->lock_id(), |
| request->initiator(), lock_key, lock_info, |
| current_lock_msg)) { |
| return; |
| } |
| LOG(INFO) << "force take delete bitmap update lock, table_id=" << table_id |
| << " lock_id=" << request->lock_id(); |
| } else if (!is_job_delete_bitmap_lock_id(lock_info.lock_id())) { |
| if (lock_info.expiration() > 0 && lock_info.expiration() < now) { |
| LOG(INFO) << "delete bitmap lock expired, continue to process. lock_id=" |
| << lock_info.lock_id() << " table_id=" << table_id |
| << " expiration=" << lock_info.expiration() << " now=" << now |
| << " initiator_size=" << lock_info.initiators_size(); |
| lock_info.clear_initiators(); |
| } else if (lock_info.lock_id() != request->lock_id()) { |
| ss << "already be locked by lock_id=" << lock_info.lock_id() |
| << " expiration=" << lock_info.expiration() << " now=" << now |
| << ", request lock_id=" << request->lock_id() << " table_id=" << table_id |
| << " initiator=" << request->initiator(); |
| msg = ss.str(); |
| code = MetaServiceCode::LOCK_CONFLICT; |
| return; |
| } |
| std::string current_lock_msg = |
| "original lock_id=" + std::to_string(lock_info.lock_id()); |
| lock_info.set_lock_id(request->lock_id()); |
| // compaction does not use the expiration, only used when upgrade ms |
| lock_info.set_expiration(expiration); |
| if (!is_job_delete_bitmap_lock_id(request->lock_id())) { |
| bool found = false; |
| for (auto initiator : lock_info.initiators()) { |
| if (request->initiator() == initiator) { |
| found = true; |
| break; |
| } |
| } |
| if (!found) { |
| lock_info.add_initiators(request->initiator()); |
| } |
| } else { |
| lock_key_not_found = true; |
| // in normal case, this should remove 0 kvs |
| // but when upgrade ms, if there are ms with old and new versions, it works |
| std::string tablet_job_key_begin = |
| mow_tablet_job_key({instance_id, table_id, 0}); |
| std::string tablet_job_key_end = |
| mow_tablet_job_key({instance_id, table_id, INT64_MAX}); |
| txn->remove(tablet_job_key_begin, tablet_job_key_end); |
| LOG(INFO) << "remove mow tablet job kv, begin=" << hex(tablet_job_key_begin) |
| << " end=" << hex(tablet_job_key_end) << " table_id=" << table_id; |
| if (!put_mow_tablet_job_key(code, msg, txn, instance_id, table_id, |
| request->lock_id(), request->initiator(), |
| expiration, current_lock_msg)) { |
| return; |
| } |
| } |
| if (!put_delete_bitmap_update_lock_key(code, msg, txn, table_id, request->lock_id(), |
| request->initiator(), lock_key, lock_info, |
| current_lock_msg)) { |
| return; |
| } |
| } else { |
| if (request->lock_id() == lock_info.lock_id()) { |
| std::string current_lock_msg = |
| "locked by lock_id=" + std::to_string(lock_info.lock_id()); |
| if (!put_mow_tablet_job_key(code, msg, txn, instance_id, table_id, |
| request->lock_id(), request->initiator(), |
| expiration, current_lock_msg)) { |
| return; |
| } |
| } else { |
| // check if compaction key is expired |
| bool has_unexpired_compaction = false; |
| int64_t unexpired_expiration = 0; |
| std::string key0 = mow_tablet_job_key({instance_id, table_id, 0}); |
| std::string key1 = mow_tablet_job_key({instance_id, table_id + 1, 0}); |
| MowTabletJobPB mow_tablet_job; |
| std::unique_ptr<RangeGetIterator> it; |
| int64_t expired_job_num = 0; |
| do { |
| err = txn->get(key0, key1, &it); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| ss << "internal error, failed to get mow tablet job, err=" << err; |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| |
| while (it->has_next() && !has_unexpired_compaction) { |
| auto [k, v] = it->next(); |
| if (!mow_tablet_job.ParseFromArray(v.data(), v.size())) [[unlikely]] { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = "failed to parse MowTabletJobPB"; |
| return; |
| } |
| if (mow_tablet_job.expiration() > 0 && |
| mow_tablet_job.expiration() < now) { |
| LOG(INFO) << "remove mow tablet job lock. table_id=" << table_id |
| << " lock_id=" << lock_info.lock_id() |
| << " expiration=" << mow_tablet_job.expiration() |
| << " now=" << now << " key=" << hex(k); |
| txn->remove(k); |
| expired_job_num++; |
| } else { |
| has_unexpired_compaction = true; |
| unexpired_expiration = mow_tablet_job.expiration(); |
| } |
| } |
| key0 = it->next_begin_key(); // Update to next smallest key for iteration |
| } while (it->more() && !has_unexpired_compaction); |
| if (has_unexpired_compaction) { |
| // TODO print initiator |
| ss << "already be locked by lock_id=" << lock_info.lock_id() |
| << " expiration=" << unexpired_expiration << " now=" << now |
| << ". request lock_id=" << request->lock_id() << " table_id=" << table_id |
| << " initiator=" << request->initiator(); |
| msg = ss.str(); |
| code = MetaServiceCode::LOCK_CONFLICT; |
| return; |
| } |
| // all job is expired |
| lock_info.set_lock_id(request->lock_id()); |
| lock_info.set_expiration(expiration); |
| lock_info.clear_initiators(); |
| std::string current_lock_msg = |
| std::to_string(expired_job_num) + " job is expired"; |
| if (!is_job_delete_bitmap_lock_id(request->lock_id())) { |
| lock_info.add_initiators(request->initiator()); |
| } else { |
| lock_key_not_found = true; |
| if (!put_mow_tablet_job_key(code, msg, txn, instance_id, table_id, |
| request->lock_id(), request->initiator(), |
| expiration, current_lock_msg)) { |
| return; |
| } |
| } |
| if (!put_delete_bitmap_update_lock_key(code, msg, txn, table_id, |
| request->lock_id(), request->initiator(), |
| lock_key, lock_info, current_lock_msg)) { |
| return; |
| } |
| } |
| } |
| } |
| |
| err = txn->commit(); |
| TEST_SYNC_POINT_CALLBACK("get_delete_bitmap_update_lock:commit:conflict", &first_retry, |
| request, &err); |
| if (err == TxnErrorCode::TXN_OK) { |
| break; |
| } else if (err == TxnErrorCode::TXN_CONFLICT && urgent && request->lock_id() > 0 && |
| first_retry) { |
| g_bvar_delete_bitmap_lock_txn_put_conflict_counter << 1; |
| // fast retry for urgent request when TXN_CONFLICT |
| LOG(INFO) << "fast retry to get_delete_bitmap_update_lock, tablet_id=" |
| << request->table_id() << " lock_id=" << request->lock_id() |
| << ", initiator=" << request->initiator() << "urgent=" << urgent |
| << ", err=" << err; |
| first_retry = false; |
| continue; |
| } else if (err == TxnErrorCode::TXN_CONFLICT && lock_key_not_found && |
| is_job_delete_bitmap_lock_id(request->lock_id()) && |
| config::delete_bitmap_enable_retry_txn_conflict && first_retry) { |
| // if err is TXN_CONFLICT, and the lock id is -1, do a fast retry |
| if (err == TxnErrorCode::TXN_CONFLICT) { |
| g_bvar_delete_bitmap_lock_txn_put_conflict_counter << 1; |
| } |
| LOG(INFO) << "fast retry to get_delete_bitmap_update_lock, tablet_id=" |
| << request->table_id() << " lock_id=" << request->lock_id() |
| << ", initiator=" << request->initiator() << ", err=" << err; |
| first_retry = false; |
| continue; |
| } else { |
| if (err == TxnErrorCode::TXN_CONFLICT) { |
| g_bvar_delete_bitmap_lock_txn_put_conflict_counter << 1; |
| } |
| code = cast_as<ErrCategory::COMMIT>(err); |
| ss << "failed to get_delete_bitmap_update_lock, lock_id=" << request->lock_id() |
| << ", initiator=" << request->initiator() << ", err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| } |
| |
| if (!get_mow_tablet_stats_and_meta(code, msg, request, response, instance_id, lock_key, "v2", |
| stats)) { |
| return; |
| } |
| } |
| |
| void MetaServiceImpl::get_delete_bitmap_update_lock_v1( |
| google::protobuf::RpcController* controller, |
| const GetDeleteBitmapUpdateLockRequest* request, |
| GetDeleteBitmapUpdateLockResponse* response, ::google::protobuf::Closure* done, |
| std::string& instance_id, MetaServiceCode& code, std::string& msg, std::stringstream& ss, |
| KVStats& stats) { |
| VLOG_DEBUG << "get delete bitmap update lock in v1 for table=" << request->table_id() |
| << ",lock id=" << request->lock_id() << ",initiator=" << request->initiator(); |
| auto table_id = request->table_id(); |
| bool urgent = request->has_urgent() && request->urgent(); |
| std::string lock_key = meta_delete_bitmap_update_lock_key({instance_id, table_id, -1}); |
| for (int retry = 0; retry <= 1; retry++) { |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| msg = "failed to init txn"; |
| return; |
| } |
| DORIS_CLOUD_DEFER { |
| if (txn == nullptr) return; |
| stats.get_bytes += txn->get_bytes(); |
| stats.put_bytes += txn->put_bytes(); |
| stats.del_bytes += txn->delete_bytes(); |
| stats.get_counter += txn->num_get_keys(); |
| stats.put_counter += txn->num_put_keys(); |
| stats.del_counter += txn->num_del_keys(); |
| }; |
| std::string lock_val; |
| DeleteBitmapUpdateLockPB lock_info; |
| err = txn->get(lock_key, &lock_val); |
| if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| ss << "failed to get delete bitmap update lock, instance_id=" << instance_id |
| << " table_id=" << table_id << " key=" << hex(lock_key) << " err=" << err; |
| msg = ss.str(); |
| code = MetaServiceCode::KV_TXN_GET_ERR; |
| return; |
| } |
| using namespace std::chrono; |
| int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count(); |
| if (err == TxnErrorCode::TXN_OK) { |
| if (!lock_info.ParseFromString(lock_val)) [[unlikely]] { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = "failed to parse DeleteBitmapUpdateLockPB"; |
| return; |
| } |
| if (urgent) { |
| // since currently only the FE Master initiates the lock request for import tasks, |
| // and it does so in a single-threaded manner, there is no need to check the lock id here |
| DCHECK(request->lock_id() > 0); |
| LOG(INFO) << "force take delete bitmap update lock, table_id=" << table_id |
| << " lock_id=" << request->lock_id() |
| << "prev_lock_id=" << lock_info.lock_id(); |
| lock_info.clear_initiators(); |
| } else if (lock_info.expiration() > 0 && lock_info.expiration() < now) { |
| LOG(INFO) << "delete bitmap lock expired, continue to process. lock_id=" |
| << lock_info.lock_id() << " table_id=" << table_id << " now=" << now; |
| lock_info.clear_initiators(); |
| } else if (lock_info.lock_id() != request->lock_id()) { |
| ss << "already be locked. request lock_id=" << request->lock_id() |
| << " locked by lock_id=" << lock_info.lock_id() << " table_id=" << table_id |
| << " now=" << now << " expiration=" << lock_info.expiration(); |
| msg = ss.str(); |
| code = MetaServiceCode::LOCK_CONFLICT; |
| return; |
| } |
| } |
| |
| lock_info.set_lock_id(request->lock_id()); |
| lock_info.set_expiration(now + request->expiration()); |
| bool found = false; |
| for (auto initiator : lock_info.initiators()) { |
| if (request->initiator() == initiator) { |
| found = true; |
| break; |
| } |
| } |
| if (!found) { |
| lock_info.add_initiators(request->initiator()); |
| } |
| lock_info.SerializeToString(&lock_val); |
| if (lock_val.empty()) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| msg = "pb serialization error"; |
| return; |
| } |
| txn->put(lock_key, lock_val); |
| LOG(INFO) << "xxx put lock_key=" << hex(lock_key) << " table_id=" << table_id |
| << " lock_id=" << request->lock_id() << " initiator=" << request->initiator() |
| << " initiators_size=" << lock_info.initiators_size(); |
| |
| err = txn->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| if (err == TxnErrorCode::TXN_CONFLICT) { |
| g_bvar_delete_bitmap_lock_txn_put_conflict_counter << 1; |
| if (retry == 0) { |
| // do a fast retry |
| response->Clear(); |
| code = MetaServiceCode::OK; |
| msg.clear(); |
| continue; |
| } |
| } |
| |
| code = cast_as<ErrCategory::COMMIT>(err); |
| ss << "failed to get_delete_bitmap_update_lock, err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| break; |
| } |
| |
| if (!get_mow_tablet_stats_and_meta(code, msg, request, response, instance_id, lock_key, "v1", |
| stats)) { |
| return; |
| }; |
| } |
| |
| void MetaServiceImpl::remove_delete_bitmap_update_lock_v2( |
| google::protobuf::RpcController* controller, |
| const RemoveDeleteBitmapUpdateLockRequest* request, |
| RemoveDeleteBitmapUpdateLockResponse* response, ::google::protobuf::Closure* done, |
| std::string& instance_id, MetaServiceCode& code, std::string& msg, std::stringstream& ss, |
| KVStats& stats) { |
| VLOG_DEBUG << "remove delete bitmap update lock in v2 for table=" << request->table_id() |
| << ",lock id=" << request->lock_id() << ",initiator=" << request->initiator(); |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| msg = "failed to init txn"; |
| return; |
| } |
| DORIS_CLOUD_DEFER { |
| if (txn == nullptr) return; |
| stats.get_bytes += txn->get_bytes(); |
| stats.put_bytes += txn->put_bytes(); |
| stats.del_bytes += txn->delete_bytes(); |
| stats.get_counter += txn->num_get_keys(); |
| stats.put_counter += txn->num_put_keys(); |
| stats.del_counter += txn->num_del_keys(); |
| }; |
| if (is_job_delete_bitmap_lock_id(request->lock_id())) { |
| std::string tablet_job_key = |
| mow_tablet_job_key({instance_id, request->table_id(), request->initiator()}); |
| txn->remove(tablet_job_key); |
| LOG(INFO) << "remove tablet job lock, table_id=" << request->table_id() |
| << " lock_id=" << request->lock_id() << " initiator=" << request->initiator() |
| << " key=" << hex(tablet_job_key); |
| } else { |
| std::string lock_key = |
| meta_delete_bitmap_update_lock_key({instance_id, request->table_id(), -1}); |
| std::string lock_val; |
| DeleteBitmapUpdateLockPB lock_info; |
| if (!check_delete_bitmap_lock(code, msg, ss, txn, instance_id, request->table_id(), |
| request->lock_id(), request->initiator(), lock_key, lock_info, |
| "v2", ", remove lock")) { |
| LOG(WARNING) << "failed to check delete bitmap tablet lock" |
| << " table_id=" << request->table_id() |
| << " tablet_id=" << request->tablet_id() |
| << " request lock_id=" << request->lock_id() |
| << " request initiator=" << request->initiator() << " msg " << msg; |
| return; |
| } |
| bool modify_initiators = false; |
| auto initiators = lock_info.mutable_initiators(); |
| for (auto iter = initiators->begin(); iter != initiators->end(); iter++) { |
| if (*iter == request->initiator()) { |
| initiators->erase(iter); |
| modify_initiators = true; |
| break; |
| } |
| } |
| if (!modify_initiators) { |
| LOG(INFO) << "initiators don't have initiator=" << request->initiator() |
| << ",initiators_size=" << lock_info.initiators_size() << ",just return"; |
| return; |
| } else if (initiators->empty()) { |
| LOG(INFO) << "remove delete bitmap lock, table_id=" << request->table_id() |
| << " lock_id=" << request->lock_id() << " key=" << hex(lock_key); |
| txn->remove(lock_key); |
| } else { |
| lock_info.SerializeToString(&lock_val); |
| if (lock_val.empty()) { |
| LOG(WARNING) << "failed to seiralize lock_info, table_id=" << request->table_id() |
| << " key=" << hex(lock_key); |
| return; |
| } |
| LOG(INFO) << "remove delete bitmap lock initiator, table_id=" << request->table_id() |
| << ", key=" << hex(lock_key) << " lock_id=" << request->lock_id() |
| << " initiator=" << request->initiator() |
| << " initiators_size=" << lock_info.initiators_size(); |
| txn->put(lock_key, lock_val); |
| } |
| } |
| err = txn->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| if (err == TxnErrorCode::TXN_CONFLICT) { |
| g_bvar_delete_bitmap_lock_txn_remove_conflict_by_fail_counter << 1; |
| } |
| code = cast_as<ErrCategory::COMMIT>(err); |
| ss << "failed to remove delete bitmap tablet lock , err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| } |
| |
| void MetaServiceImpl::remove_delete_bitmap_update_lock_v1( |
| google::protobuf::RpcController* controller, |
| const RemoveDeleteBitmapUpdateLockRequest* request, |
| RemoveDeleteBitmapUpdateLockResponse* response, ::google::protobuf::Closure* done, |
| std::string& instance_id, MetaServiceCode& code, std::string& msg, std::stringstream& ss, |
| KVStats& stats) { |
| VLOG_DEBUG << "remove delete bitmap update lock in v1 for table=" << request->table_id() |
| << ",lock id=" << request->lock_id() << ",initiator=" << request->initiator(); |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| msg = "failed to init txn"; |
| return; |
| } |
| DORIS_CLOUD_DEFER { |
| if (txn == nullptr) return; |
| stats.get_bytes += txn->get_bytes(); |
| stats.put_bytes += txn->put_bytes(); |
| stats.del_bytes += txn->delete_bytes(); |
| stats.get_counter += txn->num_get_keys(); |
| stats.put_counter += txn->num_put_keys(); |
| stats.del_counter += txn->num_del_keys(); |
| }; |
| std::string lock_key = |
| meta_delete_bitmap_update_lock_key({instance_id, request->table_id(), -1}); |
| std::string lock_val; |
| DeleteBitmapUpdateLockPB lock_info; |
| if (!check_delete_bitmap_lock(code, msg, ss, txn, instance_id, request->table_id(), |
| request->lock_id(), request->initiator(), lock_key, lock_info, |
| "v1")) { |
| LOG(WARNING) << "failed to check delete bitmap tablet lock" |
| << " table_id=" << request->table_id() << " tablet_id=" << request->tablet_id() |
| << " request lock_id=" << request->lock_id() |
| << " request initiator=" << request->initiator() << " msg " << msg; |
| return; |
| } |
| bool modify_initiators = false; |
| auto initiators = lock_info.mutable_initiators(); |
| for (auto iter = initiators->begin(); iter != initiators->end(); iter++) { |
| if (*iter == request->initiator()) { |
| initiators->erase(iter); |
| modify_initiators = true; |
| break; |
| } |
| } |
| if (!modify_initiators) { |
| LOG(INFO) << "initiators don't have initiator=" << request->initiator() |
| << ",initiators_size=" << lock_info.initiators_size() << ",just return"; |
| return; |
| } else if (initiators->empty()) { |
| LOG(INFO) << "remove delete bitmap lock, table_id=" << request->table_id() |
| << " lock_id=" << request->lock_id() << " key=" << hex(lock_key); |
| txn->remove(lock_key); |
| } else { |
| lock_info.SerializeToString(&lock_val); |
| if (lock_val.empty()) { |
| LOG(WARNING) << "failed to seiralize lock_info, table_id=" << request->table_id() |
| << " key=" << hex(lock_key); |
| return; |
| } |
| LOG(INFO) << "remove delete bitmap lock initiator, table_id=" << request->table_id() |
| << ", key=" << hex(lock_key) << " lock_id=" << request->lock_id() |
| << " initiator=" << request->initiator() |
| << " initiators_size=" << lock_info.initiators_size(); |
| txn->put(lock_key, lock_val); |
| } |
| err = txn->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::COMMIT>(err); |
| ss << "failed to remove delete bitmap tablet lock , err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| } |
| |
| void MetaServiceImpl::get_delete_bitmap_update_lock(google::protobuf::RpcController* controller, |
| const GetDeleteBitmapUpdateLockRequest* request, |
| GetDeleteBitmapUpdateLockResponse* response, |
| ::google::protobuf::Closure* done) { |
| RPC_PREPROCESS(get_delete_bitmap_update_lock, get, put, del); |
| std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : ""; |
| if (cloud_unique_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "cloud unique id not set"; |
| return; |
| } |
| instance_id = get_instance_id(resource_mgr_, cloud_unique_id); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "empty instance_id"; |
| return; |
| } |
| if (request->has_urgent() && request->urgent() && request->lock_id() < 0) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "only load can set urgent flag currently"; |
| return; |
| } |
| RPC_RATE_LIMIT(get_delete_bitmap_update_lock) |
| std::string use_version = |
| delete_bitmap_lock_white_list_->get_delete_bitmap_lock_version(instance_id); |
| LOG(INFO) << "get_delete_bitmap_update_lock instance_id=" << instance_id |
| << " use_version=" << use_version; |
| if (use_version == "v2") { |
| get_delete_bitmap_update_lock_v2(controller, request, response, done, instance_id, code, |
| msg, ss, stats); |
| } else { |
| get_delete_bitmap_update_lock_v1(controller, request, response, done, instance_id, code, |
| msg, ss, stats); |
| } |
| |
| if (request->lock_id() > 0 && code == MetaServiceCode::KV_TXN_CONFLICT) { |
| // For load, the only fdb txn conflict here is due to compaction(sc) job. |
| // We turn it into a lock conflict error to skip the MS RPC backoff becasue it's too long |
| // and totally let FE to control the retry backoff sleep time |
| code = MetaServiceCode::LOCK_CONFLICT; |
| } |
| } |
| |
| void MetaServiceImpl::remove_delete_bitmap_update_lock( |
| google::protobuf::RpcController* controller, |
| const RemoveDeleteBitmapUpdateLockRequest* request, |
| RemoveDeleteBitmapUpdateLockResponse* response, ::google::protobuf::Closure* done) { |
| RPC_PREPROCESS(remove_delete_bitmap_update_lock, get, put, del); |
| std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : ""; |
| if (cloud_unique_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "cloud unique id not set"; |
| return; |
| } |
| instance_id = get_instance_id(resource_mgr_, cloud_unique_id); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "empty instance_id"; |
| return; |
| } |
| RPC_RATE_LIMIT(remove_delete_bitmap_update_lock) |
| std::string use_version = |
| delete_bitmap_lock_white_list_->get_delete_bitmap_lock_version(instance_id); |
| LOG(INFO) << "remove_delete_bitmap_update_lock instance_id=" << instance_id |
| << " use_version=" << use_version; |
| if (use_version == "v2") { |
| remove_delete_bitmap_update_lock_v2(controller, request, response, done, instance_id, code, |
| msg, ss, stats); |
| } else { |
| remove_delete_bitmap_update_lock_v1(controller, request, response, done, instance_id, code, |
| msg, ss, stats); |
| } |
| } |
| |
| void MetaServiceImpl::remove_delete_bitmap(google::protobuf::RpcController* controller, |
| const RemoveDeleteBitmapRequest* request, |
| RemoveDeleteBitmapResponse* response, |
| ::google::protobuf::Closure* done) { |
| RPC_PREPROCESS(remove_delete_bitmap, del); |
| std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : ""; |
| if (cloud_unique_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "cloud unique id not set"; |
| return; |
| } |
| |
| instance_id = get_instance_id(resource_mgr_, cloud_unique_id); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "empty instance_id"; |
| LOG(WARNING) << msg << ", cloud_unique_id=" << request->cloud_unique_id(); |
| return; |
| } |
| RPC_RATE_LIMIT(remove_delete_bitmap) |
| auto tablet_id = request->tablet_id(); |
| auto& rowset_ids = request->rowset_ids(); |
| auto& begin_versions = request->begin_versions(); |
| auto& end_versions = request->end_versions(); |
| if (rowset_ids.size() != begin_versions.size() || rowset_ids.size() != end_versions.size()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| ss << "rowset and version size not match. " |
| << " rowset_size=" << rowset_ids.size() |
| << " begin_version_size=" << begin_versions.size() |
| << " end_version_size=" << end_versions.size(); |
| msg = ss.str(); |
| return; |
| } |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| LOG(WARNING) << "failed to init txn"; |
| return; |
| } |
| for (size_t i = 0; i < rowset_ids.size(); i++) { |
| auto delete_bitmap_start = meta_delete_bitmap_key( |
| {instance_id, tablet_id, rowset_ids[i], begin_versions[i], 0}); |
| auto delete_bitmap_end = meta_delete_bitmap_key( |
| {instance_id, tablet_id, rowset_ids[i], end_versions[i], INT64_MAX}); |
| txn->remove(delete_bitmap_start, delete_bitmap_end); |
| } |
| err = txn->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::COMMIT>(err); |
| ss << "failed to commit job kv, err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| LOG(INFO) << "remove_delete_bitmap,tablet_id=" << tablet_id |
| << ",rowset_num=" << rowset_ids.size(); |
| } |
| |
| std::pair<MetaServiceCode, std::string> MetaServiceImpl::get_instance_info( |
| const std::string& instance_id, const std::string& cloud_unique_id, |
| InstanceInfoPB* instance) { |
| std::string cloned_instance_id = instance_id; |
| if (instance_id.empty()) { |
| if (cloud_unique_id.empty()) { |
| return {MetaServiceCode::INVALID_ARGUMENT, "empty instance_id and cloud_unique_id"}; |
| } |
| // get instance_id by cloud_unique_id |
| cloned_instance_id = get_instance_id(resource_mgr_, cloud_unique_id); |
| if (cloned_instance_id.empty()) { |
| std::string msg = |
| fmt::format("cannot find instance_id with cloud_unique_id={}", cloud_unique_id); |
| return {MetaServiceCode::INVALID_ARGUMENT, std::move(msg)}; |
| } |
| } |
| |
| std::unique_ptr<Transaction> txn0; |
| TxnErrorCode err = txn_kv_->create_txn(&txn0); |
| if (err != TxnErrorCode::TXN_OK) { |
| return {cast_as<ErrCategory::CREATE>(err), "failed to create txn"}; |
| } |
| |
| std::shared_ptr<Transaction> txn(txn0.release()); |
| auto [c0, m0] = resource_mgr_->get_instance(txn, cloned_instance_id, instance); |
| if (c0 != TxnErrorCode::TXN_OK) { |
| return {cast_as<ErrCategory::READ>(c0), "failed to get instance, info=" + m0}; |
| } |
| |
| // maybe do not decrypt ak/sk? |
| MetaServiceCode code = MetaServiceCode::OK; |
| std::string msg; |
| decrypt_instance_info(*instance, cloned_instance_id, code, msg, txn); |
| return {code, std::move(msg)}; |
| } |
| |
| std::pair<std::string, std::string> init_key_pair(std::string instance_id, int64_t table_id) { |
| std::string begin_key = stats_tablet_key({instance_id, table_id, 0, 0, 0}); |
| std::string end_key = stats_tablet_key({instance_id, table_id + 1, 0, 0, 0}); |
| return std::make_pair(begin_key, end_key); |
| } |
| |
| MetaServiceResponseStatus MetaServiceImpl::fix_tablet_stats(std::string cloud_unique_id_str, |
| std::string table_id_str, |
| std::string tablet_id_str) { |
| // parse params |
| int64_t table_id; |
| int64_t tablet_id = -1; |
| std::string instance_id; |
| MetaServiceResponseStatus st = |
| parse_fix_tablet_stats_param(resource_mgr_, table_id_str, cloud_unique_id_str, |
| tablet_id_str, table_id, instance_id, tablet_id); |
| if (st.code() != MetaServiceCode::OK) { |
| return st; |
| } |
| |
| bool is_versioned_read = is_version_read_enabled(instance_id); |
| bool is_versioned_write = is_version_write_enabled(instance_id); |
| if (is_versioned_write) { |
| if (tablet_id < 0) { |
| st.set_code(MetaServiceCode::INVALID_ARGUMENT); |
| st.set_msg( |
| "cannot fix tablet stats for all tablets of a table when versioned write is " |
| "enabled, consider specifying tablet_id"); |
| return st; |
| } |
| |
| TabletIndexPB tablet_idx; |
| CloneChainReader reader(instance_id, txn_kv_.get(), resource_mgr_.get()); |
| TxnErrorCode err = reader.get_tablet_index(tablet_id, &tablet_idx); |
| if (err != TxnErrorCode::TXN_OK) { |
| st.set_code(cast_as<ErrCategory::READ>(err)); |
| st.set_msg(fmt::format("failed to get tablet index for tablet_id={}, err={}", tablet_id, |
| err)); |
| return st; |
| } |
| |
| auto&& [code, msg] = fix_versioned_tablet_stats_internal( |
| txn_kv_.get(), instance_id, tablet_idx, is_versioned_read, is_versioned_write, |
| resource_mgr_.get()); |
| st.set_code(code); |
| st.set_msg(std::move(msg)); |
| return st; |
| } |
| |
| std::pair<std::string, std::string> key_pair = init_key_pair(instance_id, table_id); |
| std::string old_begin_key; |
| while (old_begin_key < key_pair.first) { |
| // get tablet stats |
| std::vector<std::shared_ptr<TabletStatsPB>> tablet_stat_shared_ptr_vec_batch; |
| old_begin_key = key_pair.first; |
| |
| // fix tablet stats |
| size_t retry = 0; |
| do { |
| st = fix_tablet_stats_internal(txn_kv_, key_pair, tablet_stat_shared_ptr_vec_batch, |
| instance_id); |
| if (st.code() != MetaServiceCode::OK) { |
| LOG_WARNING("failed to fix tablet stats") |
| .tag("err", st.msg()) |
| .tag("table id", table_id) |
| .tag("retry time", retry); |
| } |
| retry++; |
| } while (st.code() != MetaServiceCode::OK && retry < 3); |
| if (st.code() != MetaServiceCode::OK) { |
| return st; |
| } |
| |
| // Check tablet stats |
| st = check_new_tablet_stats(txn_kv_, instance_id, tablet_stat_shared_ptr_vec_batch); |
| if (st.code() != MetaServiceCode::OK) { |
| return st; |
| } |
| } |
| return st; |
| } |
| |
| std::pair<MetaServiceCode, std::string> MetaServiceImpl::fix_tablet_db_id( |
| const std::string& instance_id, int64_t tablet_id, int64_t db_id) { |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| return {cast_as<ErrCategory::CREATE>(err), "failed to init txn"}; |
| } |
| |
| TabletIndexPB tablet_index_pb; |
| std::string key = meta_tablet_idx_key({instance_id, tablet_id}); |
| std::string value; |
| err = txn->get(key, &value); |
| if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| std::string msg = fmt::format("tablet index pb not found, key={}", hex(key)); |
| return {MetaServiceCode::TABLET_NOT_FOUND, msg}; |
| } else if (err != TxnErrorCode::TXN_OK) { |
| std::string msg = |
| fmt::format("failed to get tablet index pb, key={}, err={}", hex(key), err); |
| return {cast_as<ErrCategory::READ>(err), msg}; |
| } else if (!tablet_index_pb.ParseFromString(value)) { |
| std::string msg = fmt::format("failed to parse TabletIndexPB, key={}", hex(key)); |
| return {MetaServiceCode::PROTOBUF_PARSE_ERR, msg}; |
| } else if (tablet_index_pb.db_id() == db_id) { |
| LOG(INFO) << "no need to fix tablet db id, same db id, tablet_id=" << tablet_id |
| << ", db_id=" << db_id; |
| return {MetaServiceCode::OK, ""}; |
| } |
| |
| tablet_index_pb.set_db_id(db_id); |
| txn->put(key, tablet_index_pb.SerializeAsString()); |
| err = txn->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| std::string msg = fmt::format( |
| "failed to commit txn to fix tablet db id, tablet_id={}, db_id={}, err={}", |
| tablet_id, db_id, err); |
| return {cast_as<ErrCategory::COMMIT>(err), msg}; |
| } |
| |
| return {MetaServiceCode::OK, ""}; |
| } |
| |
| std::size_t get_segments_key_bounds_bytes(const doris::RowsetMetaCloudPB& rowset_meta) { |
| size_t ret {0}; |
| for (const auto& key_bounds : rowset_meta.segments_key_bounds()) { |
| ret += key_bounds.ByteSizeLong(); |
| } |
| return ret; |
| } |
| |
| void MetaServiceImpl::get_schema_dict(::google::protobuf::RpcController* controller, |
| const GetSchemaDictRequest* request, |
| GetSchemaDictResponse* response, |
| ::google::protobuf::Closure* done) { |
| RPC_PREPROCESS(get_schema_dict, get); |
| instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id()); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "empty instance_id"; |
| LOG(WARNING) << msg << ", cloud_unique_id=" << request->cloud_unique_id(); |
| return; |
| } |
| |
| if (!request->has_index_id()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "missing index_id in request"; |
| return; |
| } |
| |
| RPC_RATE_LIMIT(get_schema_dict) |
| |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| msg = "failed to init txn"; |
| return; |
| } |
| |
| std::string dict_key = meta_schema_pb_dictionary_key({instance_id, request->index_id()}); |
| ValueBuf dict_val; |
| err = cloud::blob_get(txn.get(), dict_key, &dict_val); |
| LOG(INFO) << "Retrieved column pb dictionary, index_id=" << request->index_id() |
| << " key=" << hex(dict_key) << " error=" << err; |
| if (err != TxnErrorCode::TXN_KEY_NOT_FOUND && err != TxnErrorCode::TXN_OK) { |
| // Handle retrieval error. |
| ss << "Failed to retrieve column pb dictionary, instance_id=" << instance_id |
| << " table_id=" << request->index_id() << " key=" << hex(dict_key) << " error=" << err; |
| msg = ss.str(); |
| code = cast_as<ErrCategory::READ>(err); |
| return; |
| } |
| SchemaCloudDictionary schema_dict; |
| if (err == TxnErrorCode::TXN_OK && !dict_val.to_pb(&schema_dict)) { |
| // Handle parse error. |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = fmt::format("Malformed tablet dictionary value, key={}", hex(dict_key)); |
| return; |
| } |
| |
| response->mutable_schema_dict()->Swap(&schema_dict); |
| } |
| |
| void MetaServiceImpl::update_packed_file_info(::google::protobuf::RpcController* controller, |
| const UpdatePackedFileInfoRequest* request, |
| UpdatePackedFileInfoResponse* response, |
| ::google::protobuf::Closure* done) { |
| RPC_PREPROCESS(update_packed_file_info); |
| |
| // Validate request parameters |
| if (!request->has_cloud_unique_id() || request->cloud_unique_id().empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "cloud_unique_id is required"; |
| return; |
| } |
| |
| if (!request->has_packed_file_path() || request->packed_file_path().empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "packed_file_path is required"; |
| return; |
| } |
| |
| if (!request->has_packed_file_info()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "packed_file_info is required"; |
| return; |
| } |
| |
| instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id()); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "empty instance_id"; |
| LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id(); |
| return; |
| } |
| |
| RPC_RATE_LIMIT(update_packed_file_info) |
| |
| // Create transaction |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = MetaServiceCode::KV_TXN_CREATE_ERR; |
| msg = fmt::format("failed to create txn, err={}", err); |
| return; |
| } |
| |
| // Generate packed file key |
| PackedFileKeyInfo key_info {instance_id, request->packed_file_path()}; |
| std::string packed_file_key_str = packed_file_key(key_info); |
| |
| // Serialize packed file info |
| std::string packed_file_info_val; |
| if (!request->packed_file_info().SerializeToString(&packed_file_info_val)) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| msg = "failed to serialize packed_file_info"; |
| return; |
| } |
| |
| // Put packed file info |
| txn->put(packed_file_key_str, packed_file_info_val); |
| |
| // Commit transaction |
| err = txn->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = MetaServiceCode::KV_TXN_COMMIT_ERR; |
| msg = fmt::format("failed to commit txn, err={}", err); |
| return; |
| } |
| |
| LOG(INFO) << "successfully updated packed file info" |
| << ", instance_id=" << instance_id |
| << ", packed_file_path=" << request->packed_file_path() |
| << ", total_slice_num=" << request->packed_file_info().total_slice_num() |
| << ", ref_cnt=" << request->packed_file_info().ref_cnt() |
| << ", key=" << hex(packed_file_key_str); |
| } |
| |
| std::string hide_access_key(const std::string& ak) { |
| std::string key = ak; |
| size_t key_len = key.length(); |
| size_t reserved_count; |
| if (key_len > 6) { |
| reserved_count = 6; |
| } else if (key_len > 2) { |
| reserved_count = key_len - 2; |
| } else { |
| reserved_count = 0; |
| } |
| |
| size_t x_count = key_len - reserved_count; |
| size_t left_x_count = x_count / 2; |
| |
| if (left_x_count > 0) { |
| key.replace(0, left_x_count, left_x_count, 'x'); |
| } |
| |
| if (x_count - left_x_count > 0) { |
| key.replace(key_len - (x_count - left_x_count), x_count - left_x_count, |
| x_count - left_x_count, 'x'); |
| } |
| return key; |
| } |
| |
| std::pair<MetaServiceCode, std::string> MetaServiceImpl::wait_for_pending_txns( |
| const std::string& instance_id, std::vector<VersionPB>& versions) { |
| std::vector<std::shared_ptr<TxnLazyCommitTask>> tasks; |
| for (auto& version : versions) { |
| if (version.pending_txn_ids_size() == 0) { |
| continue; |
| } |
| |
| DCHECK_EQ(version.pending_txn_ids_size(), 1) |
| << "only support one pending txn id for now, version=" |
| << version.ShortDebugString(); |
| |
| int64_t first_txn_id = version.pending_txn_ids(0); |
| std::shared_ptr<TxnLazyCommitTask> task = |
| txn_lazy_committer_->submit(instance_id, first_txn_id); |
| tasks.push_back(task); |
| } |
| |
| for (auto& task : tasks) { |
| auto&& [code, msg] = task->wait(); |
| if (code != MetaServiceCode::OK) { |
| LOG(WARNING) << "wait for pending txn " << task->txn_id() << " failed, code=" << code |
| << " msg=" << msg; |
| return {code, msg}; |
| } |
| } |
| |
| for (auto& version : versions) { |
| if (version.pending_txn_ids_size() == 0) { |
| continue; |
| } |
| version.clear_pending_txn_ids(); |
| version.set_version(version.version() + 1); |
| } |
| |
| return {MetaServiceCode::OK, ""}; |
| } |
| |
| } // namespace doris::cloud |