| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "meta-store/meta_reader.h" |
| |
| #include <gen_cpp/cloud.pb.h> |
| #include <gen_cpp/olap_file.pb.h> |
| |
| #include <limits> |
| #include <memory> |
| |
| #include "common/logging.h" |
| #include "common/util.h" |
| #include "meta-store/codec.h" |
| #include "meta-store/document_message.h" |
| #include "meta-store/document_message_get_range.h" |
| #include "meta-store/keys.h" |
| #include "meta-store/txn_kv.h" |
| #include "meta-store/txn_kv_error.h" |
| #include "meta-store/versioned_value.h" |
| |
| namespace doris::cloud { |
| |
| TxnErrorCode MetaReader::get_table_version(int64_t table_id, Versionstamp* table_version, |
| bool snapshot) { |
| DCHECK(txn_kv_) << "TxnKv must be set before calling"; |
| if (!txn_kv_) { |
| return TxnErrorCode::TXN_INVALID_ARGUMENT; |
| } |
| |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| return get_table_version(txn.get(), table_id, table_version, snapshot); |
| } |
| |
| TxnErrorCode MetaReader::get_table_version(Transaction* txn, int64_t table_id, |
| Versionstamp* table_version, bool snapshot) { |
| std::string table_version_key = versioned::table_version_key({instance_id_, table_id}); |
| std::string table_version_value; |
| Versionstamp key_version; |
| TxnErrorCode err = versioned_get(txn, table_version_key, snapshot_version_, &key_version, |
| &table_version_value, snapshot); |
| if (err == TxnErrorCode::TXN_OK) { |
| if (table_version) { |
| *table_version = key_version; |
| } |
| min_read_versionstamp_ = std::min(min_read_versionstamp_, key_version); |
| } |
| return err; |
| } |
| |
| TxnErrorCode MetaReader::get_tablet_meta(int64_t tablet_id, TabletMetaCloudPB* tablet_meta, |
| Versionstamp* versionstamp, bool snapshot) { |
| DCHECK(txn_kv_) << "TxnKv must be set before calling"; |
| if (!txn_kv_) { |
| return TxnErrorCode::TXN_INVALID_ARGUMENT; |
| } |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| |
| return get_tablet_meta(txn.get(), tablet_id, tablet_meta, versionstamp, snapshot); |
| } |
| |
| TxnErrorCode MetaReader::get_tablet_meta(Transaction* txn, int64_t tablet_id, |
| TabletMetaCloudPB* tablet_meta, Versionstamp* versionstamp, |
| bool snapshot) { |
| std::string tablet_meta_key = versioned::meta_tablet_key({instance_id_, tablet_id}); |
| Versionstamp key_version; |
| TxnErrorCode err = versioned::document_get(txn, tablet_meta_key, snapshot_version_, tablet_meta, |
| &key_version, snapshot); |
| if (err == TxnErrorCode::TXN_OK) { |
| if (versionstamp) { |
| *versionstamp = key_version; |
| } |
| min_read_versionstamp_ = std::min(min_read_versionstamp_, key_version); |
| } |
| return err; |
| } |
| |
| TxnErrorCode MetaReader::get_tablet_schema(int64_t index_id, int64_t schema_version, |
| TabletSchemaCloudPB* tablet_schema, bool snapshot) { |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| |
| return get_tablet_schema(txn.get(), index_id, schema_version, tablet_schema, snapshot); |
| } |
| |
| TxnErrorCode MetaReader::get_tablet_schema(Transaction* txn, int64_t index_id, |
| int64_t schema_version, |
| TabletSchemaCloudPB* tablet_schema, bool snapshot) { |
| std::string tablet_schema_key = |
| versioned::meta_schema_key({instance_id_, index_id, schema_version}); |
| return document_get(txn, tablet_schema_key, tablet_schema, snapshot); |
| } |
| |
| TxnErrorCode MetaReader::get_partition_version(int64_t partition_id, VersionPB* version, |
| Versionstamp* partition_version, bool snapshot) { |
| DCHECK(txn_kv_) << "TxnKv must be set before calling"; |
| if (!txn_kv_) { |
| return TxnErrorCode::TXN_INVALID_ARGUMENT; |
| } |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| |
| return get_partition_version(txn.get(), partition_id, version, partition_version, snapshot); |
| } |
| |
| TxnErrorCode MetaReader::get_partition_version(Transaction* txn, int64_t partition_id, |
| VersionPB* version, Versionstamp* partition_version, |
| bool snapshot) { |
| std::string partition_version_key = |
| versioned::partition_version_key({instance_id_, partition_id}); |
| std::string partition_version_value; |
| Versionstamp key_version; |
| TxnErrorCode err = versioned_get(txn, partition_version_key, snapshot_version_, &key_version, |
| &partition_version_value, snapshot); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| |
| if (partition_version) { |
| *partition_version = key_version; |
| } |
| min_read_versionstamp_ = std::min(min_read_versionstamp_, key_version); |
| |
| if (version && !version->ParseFromString(partition_version_value)) { |
| LOG_ERROR("Failed to parse VersionPB") |
| .tag("instance_id", instance_id_) |
| .tag("partition_id", partition_id) |
| .tag("key", hex(partition_version_key)); |
| return TxnErrorCode::TXN_INVALID_DATA; |
| } |
| |
| return TxnErrorCode::TXN_OK; |
| } |
| |
| TxnErrorCode MetaReader::get_tablet_load_stats(int64_t tablet_id, TabletStatsPB* tablet_stats, |
| Versionstamp* versionstamp, bool snapshot) { |
| DCHECK(txn_kv_) << "TxnKv must be set before calling"; |
| if (!txn_kv_) { |
| return TxnErrorCode::TXN_INVALID_ARGUMENT; |
| } |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| return get_tablet_load_stats(txn.get(), tablet_id, tablet_stats, versionstamp, snapshot); |
| } |
| |
| TxnErrorCode MetaReader::get_tablet_load_stats(Transaction* txn, int64_t tablet_id, |
| TabletStatsPB* tablet_stats, |
| Versionstamp* versionstamp, bool snapshot) { |
| std::string tablet_load_stats_key = versioned::tablet_load_stats_key({instance_id_, tablet_id}); |
| std::string tablet_load_stats_value; |
| Versionstamp key_version; |
| TxnErrorCode err = versioned_get(txn, tablet_load_stats_key, snapshot_version_, &key_version, |
| &tablet_load_stats_value, snapshot); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| |
| if (versionstamp) { |
| *versionstamp = key_version; |
| } |
| min_read_versionstamp_ = std::min(min_read_versionstamp_, key_version); |
| |
| if (tablet_stats && !tablet_stats->ParseFromString(tablet_load_stats_value)) { |
| LOG_ERROR("Failed to parse TabletStatsPB") |
| .tag("instance_id", instance_id_) |
| .tag("tablet_id", tablet_id) |
| .tag("key", hex(tablet_load_stats_key)); |
| return TxnErrorCode::TXN_INVALID_DATA; |
| } |
| |
| return TxnErrorCode::TXN_OK; |
| } |
| |
| TxnErrorCode MetaReader::get_tablet_compact_stats(int64_t tablet_id, TabletStatsPB* tablet_stats, |
| Versionstamp* versionstamp, bool snapshot) { |
| DCHECK(txn_kv_) << "TxnKv must be set before calling"; |
| if (!txn_kv_) { |
| return TxnErrorCode::TXN_INVALID_ARGUMENT; |
| } |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| return get_tablet_compact_stats(txn.get(), tablet_id, tablet_stats, versionstamp, snapshot); |
| } |
| |
| TxnErrorCode MetaReader::get_tablet_compact_stats(Transaction* txn, int64_t tablet_id, |
| TabletStatsPB* tablet_stats, |
| Versionstamp* versionstamp, bool snapshot) { |
| std::string tablet_compact_stats_key = |
| versioned::tablet_compact_stats_key({instance_id_, tablet_id}); |
| std::string tablet_compact_stats_value; |
| Versionstamp key_version; |
| TxnErrorCode err = versioned_get(txn, tablet_compact_stats_key, snapshot_version_, &key_version, |
| &tablet_compact_stats_value, snapshot); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| |
| if (versionstamp) { |
| *versionstamp = key_version; |
| } |
| min_read_versionstamp_ = std::min(min_read_versionstamp_, key_version); |
| |
| if (tablet_stats && !tablet_stats->ParseFromString(tablet_compact_stats_value)) { |
| LOG_ERROR("Failed to parse TabletStatsPB") |
| .tag("instance_id", instance_id_) |
| .tag("tablet_id", tablet_id) |
| .tag("key", hex(tablet_compact_stats_key)); |
| return TxnErrorCode::TXN_INVALID_DATA; |
| } |
| |
| return TxnErrorCode::TXN_OK; |
| } |
| |
| TxnErrorCode MetaReader::get_tablet_compact_stats( |
| const std::vector<int64_t>& tablet_ids, |
| std::unordered_map<int64_t, TabletStatsPB>* tablet_stats, |
| std::unordered_map<int64_t, Versionstamp>* versionstamps, bool snapshot) { |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| return get_tablet_compact_stats(txn.get(), tablet_ids, tablet_stats, versionstamps, snapshot); |
| } |
| |
| TxnErrorCode MetaReader::get_tablet_compact_stats( |
| Transaction* txn, const std::vector<int64_t>& tablet_ids, |
| std::unordered_map<int64_t, TabletStatsPB>* tablet_stats, |
| std::unordered_map<int64_t, Versionstamp>* versionstamps, bool snapshot) { |
| if (tablet_ids.empty()) { |
| return TxnErrorCode::TXN_OK; |
| } |
| |
| std::vector<std::string> tablet_compact_stats_keys; |
| for (size_t i = 0; i < tablet_ids.size(); ++i) { |
| int64_t tablet_id = tablet_ids[i]; |
| std::string tablet_compact_stats_key = |
| versioned::tablet_compact_stats_key({instance_id_, tablet_id}); |
| tablet_compact_stats_keys.push_back(std::move(tablet_compact_stats_key)); |
| } |
| |
| std::vector<std::optional<std::pair<std::string, Versionstamp>>> versioned_values; |
| TxnErrorCode err = versioned_batch_get(txn, tablet_compact_stats_keys, snapshot_version_, |
| &versioned_values, snapshot); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| |
| for (size_t i = 0; i < versioned_values.size(); ++i) { |
| const auto& kv = versioned_values[i]; |
| if (!kv.has_value()) { |
| continue; // Key not found, skip |
| } |
| |
| const std::string& value = kv->first; |
| Versionstamp versionstamp = kv->second; |
| int64_t tablet_id = tablet_ids[i]; |
| |
| if (versionstamps) { |
| versionstamps->emplace(tablet_id, versionstamp); |
| } |
| |
| if (tablet_stats) { |
| TabletStatsPB tablet_stat; |
| if (!tablet_stat.ParseFromString(value)) { |
| LOG_ERROR("Failed to parse TabletStatsPB") |
| .tag("instance_id", instance_id_) |
| .tag("tablet_id", tablet_id) |
| .tag("key", hex(tablet_compact_stats_keys[i])) |
| .tag("value", hex(value)); |
| return TxnErrorCode::TXN_INVALID_DATA; |
| } |
| tablet_stats->emplace(tablet_id, std::move(tablet_stat)); |
| } |
| } |
| |
| return TxnErrorCode::TXN_OK; |
| } |
| |
| TxnErrorCode MetaReader::get_tablet_load_stats( |
| const std::vector<int64_t>& tablet_ids, |
| std::unordered_map<int64_t, TabletStatsPB>* tablet_stats, |
| std::unordered_map<int64_t, Versionstamp>* versionstamps, bool snapshot) { |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| return get_tablet_load_stats(txn.get(), tablet_ids, tablet_stats, versionstamps, snapshot); |
| } |
| |
| TxnErrorCode MetaReader::get_tablet_load_stats( |
| Transaction* txn, const std::vector<int64_t>& tablet_ids, |
| std::unordered_map<int64_t, TabletStatsPB>* tablet_stats, |
| std::unordered_map<int64_t, Versionstamp>* versionstamps, bool snapshot) { |
| if (tablet_ids.empty()) { |
| return TxnErrorCode::TXN_OK; |
| } |
| |
| std::vector<std::string> tablet_load_stats_keys; |
| for (size_t i = 0; i < tablet_ids.size(); ++i) { |
| int64_t tablet_id = tablet_ids[i]; |
| std::string tablet_load_stats_key = |
| versioned::tablet_load_stats_key({instance_id_, tablet_id}); |
| tablet_load_stats_keys.push_back(std::move(tablet_load_stats_key)); |
| } |
| |
| std::vector<std::optional<std::pair<std::string, Versionstamp>>> versioned_values; |
| TxnErrorCode err = versioned_batch_get(txn, tablet_load_stats_keys, snapshot_version_, |
| &versioned_values, snapshot); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| |
| for (size_t i = 0; i < versioned_values.size(); ++i) { |
| const auto& kv = versioned_values[i]; |
| if (!kv.has_value()) { |
| continue; // Key not found, skip |
| } |
| |
| const std::string& value = kv->first; |
| Versionstamp versionstamp = kv->second; |
| int64_t tablet_id = tablet_ids[i]; |
| |
| min_read_versionstamp_ = std::min(min_read_versionstamp_, versionstamp); |
| |
| if (versionstamps) { |
| versionstamps->emplace(tablet_id, versionstamp); |
| } |
| |
| if (tablet_stats) { |
| TabletStatsPB tablet_stat; |
| if (!tablet_stat.ParseFromString(value)) { |
| LOG_ERROR("Failed to parse TabletStatsPB") |
| .tag("instance_id", instance_id_) |
| .tag("tablet_id", tablet_id) |
| .tag("key", hex(tablet_load_stats_keys[i])) |
| .tag("value", hex(value)); |
| return TxnErrorCode::TXN_INVALID_DATA; |
| } |
| tablet_stats->emplace(tablet_id, std::move(tablet_stat)); |
| } |
| } |
| |
| return TxnErrorCode::TXN_OK; |
| } |
| |
| TxnErrorCode MetaReader::get_tablet_merged_stats(int64_t tablet_id, TabletStatsPB* tablet_stats, |
| Versionstamp* versionstamp, bool snapshot) { |
| DCHECK(txn_kv_) << "TxnKv must be set before calling"; |
| if (!txn_kv_) { |
| return TxnErrorCode::TXN_INVALID_ARGUMENT; |
| } |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| return get_tablet_merged_stats(txn.get(), tablet_id, tablet_stats, versionstamp, snapshot); |
| } |
| |
| TxnErrorCode MetaReader::get_tablet_merged_stats(Transaction* txn, int64_t tablet_id, |
| TabletStatsPB* tablet_stats, |
| Versionstamp* versionstamp, bool snapshot) { |
| TabletStatsPB load_stats, compact_stats; |
| Versionstamp load_version, compact_version; |
| TxnErrorCode err = get_tablet_load_stats(txn, tablet_id, &load_stats, &load_version, snapshot); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| err = get_tablet_compact_stats(txn, tablet_id, &compact_stats, &compact_version, snapshot); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| if (tablet_stats) { |
| merge_tablet_stats(load_stats, compact_stats, tablet_stats); |
| } |
| Versionstamp read_version = std::min(load_version, compact_version); |
| if (versionstamp) { |
| *versionstamp = read_version; |
| } |
| min_read_versionstamp_ = std::min(min_read_versionstamp_, read_version); |
| return TxnErrorCode::TXN_OK; |
| } |
| |
| void MetaReader::merge_tablet_stats(const TabletStatsPB& load_stats, |
| const TabletStatsPB& compact_stats, |
| TabletStatsPB* tablet_stats) { |
| // The compact_stats is the based tablet stats, and load_stats only contains |
| // the detached stats updated by load operations. |
| tablet_stats->CopyFrom(compact_stats); |
| tablet_stats->set_num_rows(load_stats.num_rows() + compact_stats.num_rows()); |
| tablet_stats->set_num_rowsets(load_stats.num_rowsets() + compact_stats.num_rowsets()); |
| tablet_stats->set_num_segments(load_stats.num_segments() + compact_stats.num_segments()); |
| tablet_stats->set_data_size(load_stats.data_size() + compact_stats.data_size()); |
| tablet_stats->set_index_size(load_stats.index_size() + compact_stats.index_size()); |
| tablet_stats->set_segment_size(load_stats.segment_size() + compact_stats.segment_size()); |
| } |
| |
| TxnErrorCode MetaReader::get_tablet_index(int64_t tablet_id, TabletIndexPB* tablet_index, |
| bool snapshot) { |
| DCHECK(txn_kv_) << "TxnKv must be set before calling"; |
| if (!txn_kv_) { |
| return TxnErrorCode::TXN_INVALID_ARGUMENT; |
| } |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| |
| return get_tablet_index(txn.get(), tablet_id, tablet_index, snapshot); |
| } |
| |
| TxnErrorCode MetaReader::get_tablet_index(Transaction* txn, int64_t tablet_id, |
| TabletIndexPB* tablet_index, bool snapshot) { |
| std::string tablet_index_key = versioned::tablet_index_key({instance_id_, tablet_id}); |
| std::string value; |
| TxnErrorCode err = txn->get(tablet_index_key, &value); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| |
| if (tablet_index && !tablet_index->ParseFromString(value)) { |
| LOG_ERROR("Failed to parse TabletIndexPB") |
| .tag("instance_id", instance_id_) |
| .tag("tablet_id", tablet_id) |
| .tag("key", hex(tablet_index_key)); |
| return TxnErrorCode::TXN_INVALID_DATA; |
| } |
| |
| return TxnErrorCode::TXN_OK; |
| } |
| |
| TxnErrorCode MetaReader::get_table_versions( |
| const std::vector<int64_t>& table_ids, |
| std::unordered_map<int64_t, Versionstamp>* table_versions, bool snapshot) { |
| DCHECK(txn_kv_) << "TxnKv must be set before calling"; |
| if (!txn_kv_) { |
| return TxnErrorCode::TXN_INVALID_ARGUMENT; |
| } |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| return get_table_versions(txn.get(), table_ids, table_versions, snapshot); |
| } |
| |
| TxnErrorCode MetaReader::get_table_versions( |
| Transaction* txn, const std::vector<int64_t>& table_ids, |
| std::unordered_map<int64_t, Versionstamp>* table_versions, bool snapshot) { |
| if (table_ids.empty()) { |
| return TxnErrorCode::TXN_OK; |
| } |
| |
| std::vector<std::string> version_keys; |
| for (size_t i = 0; i < table_ids.size(); ++i) { |
| int64_t table_id = table_ids[i]; |
| std::string table_version_key = versioned::table_version_key({instance_id_, table_id}); |
| version_keys.push_back(std::move(table_version_key)); |
| } |
| |
| std::vector<std::optional<std::pair<std::string, Versionstamp>>> versioned_values; |
| TxnErrorCode err = |
| versioned_batch_get(txn, version_keys, snapshot_version_, &versioned_values, snapshot); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| |
| for (size_t i = 0; i < versioned_values.size(); ++i) { |
| const auto& kv = versioned_values[i]; |
| if (!kv.has_value()) { |
| continue; // Key not found, skip |
| } |
| |
| Versionstamp version = kv->second; |
| int64_t table_id = table_ids[i]; |
| table_versions->emplace(table_id, version); |
| min_read_versionstamp_ = std::min(min_read_versionstamp_, version); |
| } |
| |
| return TxnErrorCode::TXN_OK; |
| } |
| |
| TxnErrorCode MetaReader::get_partition_versions( |
| const std::vector<int64_t>& partition_ids, std::unordered_map<int64_t, VersionPB>* versions, |
| std::unordered_map<int64_t, Versionstamp>* versionstamps, bool snapshot) { |
| DCHECK(txn_kv_) << "TxnKv must be set before calling"; |
| if (!txn_kv_) { |
| return TxnErrorCode::TXN_INVALID_ARGUMENT; |
| } |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| return get_partition_versions(txn.get(), partition_ids, versions, versionstamps, snapshot); |
| } |
| |
| TxnErrorCode MetaReader::get_partition_versions( |
| Transaction* txn, const std::vector<int64_t>& partition_ids, |
| std::unordered_map<int64_t, VersionPB>* versions, |
| std::unordered_map<int64_t, Versionstamp>* versionstamps, bool snapshot) { |
| if (partition_ids.empty()) { |
| return TxnErrorCode::TXN_OK; |
| } |
| |
| std::vector<std::string> version_keys; |
| for (size_t i = 0; i < partition_ids.size(); ++i) { |
| int64_t partition_id = partition_ids[i]; |
| std::string partition_version_key = |
| versioned::partition_version_key({instance_id_, partition_id}); |
| version_keys.push_back(std::move(partition_version_key)); |
| } |
| |
| std::vector<std::optional<std::pair<std::string, Versionstamp>>> versioned_values; |
| TxnErrorCode err = |
| versioned_batch_get(txn, version_keys, snapshot_version_, &versioned_values, snapshot); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| |
| for (size_t i = 0; i < versioned_values.size(); ++i) { |
| const auto& kv = versioned_values[i]; |
| if (!kv.has_value()) { |
| continue; // Key not found, skip |
| } |
| |
| const std::string& value = kv->first; |
| Versionstamp versionstamp = kv->second; |
| int64_t partition_id = partition_ids[i]; |
| |
| if (versionstamps) { |
| versionstamps->emplace(partition_id, versionstamp); |
| } |
| min_read_versionstamp_ = std::min(min_read_versionstamp_, versionstamp); |
| |
| if (versions) { |
| VersionPB version; |
| if (!version.ParseFromString(value)) { |
| LOG_ERROR("Failed to parse VersionPB") |
| .tag("instance_id", instance_id_) |
| .tag("partition_id", partition_id) |
| .tag("key", hex(version_keys[i])) |
| .tag("value", hex(value)); |
| return TxnErrorCode::TXN_INVALID_DATA; |
| } |
| versions->emplace(partition_id, std::move(version)); |
| } |
| } |
| |
| return TxnErrorCode::TXN_OK; |
| } |
| |
| TxnErrorCode MetaReader::get_partition_versions(Transaction* txn, |
| const std::vector<int64_t>& partition_ids, |
| std::unordered_map<int64_t, int64_t>* versions, |
| int64_t* last_pending_txn_id, bool snapshot) { |
| std::unordered_map<int64_t, VersionPB> version_pb_map; |
| TxnErrorCode err = |
| get_partition_versions(txn, partition_ids, &version_pb_map, nullptr, snapshot); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| |
| for (int64_t partition_id : partition_ids) { |
| auto it = version_pb_map.find(partition_id); |
| if (it == version_pb_map.end()) { |
| versions->emplace(partition_id, 1); |
| } else { |
| const VersionPB& version_pb = it->second; |
| int64_t version = version_pb.version(); |
| versions->emplace(partition_id, version); |
| if (last_pending_txn_id && version_pb.pending_txn_ids_size() > 0) { |
| *last_pending_txn_id = version_pb.pending_txn_ids(0); |
| } |
| } |
| } |
| |
| return TxnErrorCode::TXN_OK; |
| } |
| |
| TxnErrorCode MetaReader::get_all_tablet_ids(std::vector<int64_t>* tablet_ids, bool snapshot) { |
| DCHECK(txn_kv_) << "TxnKv must be set before calling"; |
| if (!txn_kv_) { |
| return TxnErrorCode::TXN_INVALID_ARGUMENT; |
| } |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| return get_all_tablet_ids(txn.get(), tablet_ids, snapshot); |
| } |
| |
| TxnErrorCode MetaReader::get_all_tablet_ids(Transaction* txn, std::vector<int64_t>* tablet_ids, |
| bool snapshot) { |
| { |
| std::string start_key = versioned::tablet_index_key({instance_id_, 0}); |
| std::string end_key = |
| versioned::tablet_index_key({instance_id_, std::numeric_limits<int64_t>::max()}); |
| |
| // [start, end] |
| FullRangeGetOptions options; |
| options.prefetch = false; |
| options.snapshot = snapshot; |
| |
| auto iter = txn->full_range_get(start_key, end_key, options); |
| for (auto&& kvp = iter->next(); kvp.has_value(); kvp = iter->next()) { |
| auto&& [key, value] = kvp.value(); |
| TabletIndexPB tablet_idx; |
| if (!tablet_idx.ParseFromArray(value.data(), value.size())) { |
| LOG_ERROR("Failed to parse TabletIndexPB") |
| .tag("instance_id", instance_id_) |
| .tag("key", hex(key)) |
| .tag("value", hex(value)); |
| return TxnErrorCode::TXN_INVALID_DATA; |
| } |
| |
| tablet_ids->emplace_back(tablet_idx.tablet_id()); |
| } |
| } |
| return TxnErrorCode::TXN_OK; |
| } |
| |
| TxnErrorCode MetaReader::get_rowset_metas(int64_t tablet_id, int64_t start_version, |
| int64_t end_version, |
| std::vector<RowsetMetaCloudPB>* rowset_metas, |
| bool snapshot) { |
| DCHECK(txn_kv_) << "TxnKv must be set before calling"; |
| if (!txn_kv_) { |
| return TxnErrorCode::TXN_INVALID_ARGUMENT; |
| } |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| return get_rowset_metas(txn.get(), tablet_id, start_version, end_version, rowset_metas, |
| snapshot); |
| } |
| |
| TxnErrorCode MetaReader::get_rowset_metas(Transaction* txn, int64_t tablet_id, |
| int64_t start_version, int64_t end_version, |
| std::vector<RowsetMetaCloudPB>* rowset_metas, |
| bool snapshot) { |
| std::map<int64_t, RowsetMetaCloudPB> rowset_graph; |
| |
| { |
| std::string start_key = |
| versioned::meta_rowset_load_key({instance_id_, tablet_id, start_version}); |
| std::string end_key = |
| versioned::meta_rowset_load_key({instance_id_, tablet_id, end_version}); |
| |
| // [start, end] |
| versioned::ReadDocumentMessagesOptions options; |
| options.snapshot = snapshot; |
| options.snapshot_version = snapshot_version_; |
| options.exclude_begin_key = false; |
| options.exclude_end_key = false; |
| |
| auto iter = |
| versioned::document_get_range<RowsetMetaCloudPB>(txn, start_key, end_key, options); |
| for (auto&& kvp = iter->next(); kvp.has_value(); kvp = iter->next()) { |
| auto&& [key, version, rowset_meta] = *kvp; |
| rowset_graph.emplace(rowset_meta.end_version(), std::move(rowset_meta)); |
| min_read_versionstamp_ = std::min(min_read_versionstamp_, version); |
| DCHECK(version < snapshot_version_) |
| << "version: " << version.to_string() |
| << ", snapshot_version: " << snapshot_version_.to_string(); |
| } |
| if (!iter->is_valid()) { |
| LOG_ERROR("failed to get loaded rowset metas") |
| .tag("instance_id", instance_id_) |
| .tag("tablet_id", tablet_id) |
| .tag("start_version", start_version) |
| .tag("end_version", end_version) |
| .tag("error_code", iter->error_code()); |
| return iter->error_code(); |
| } |
| } |
| |
| { |
| std::string start_key = |
| versioned::meta_rowset_compact_key({instance_id_, tablet_id, start_version}); |
| std::string end_key = |
| versioned::meta_rowset_compact_key({instance_id_, tablet_id, end_version}); |
| |
| // [start, end] |
| versioned::ReadDocumentMessagesOptions options; |
| options.snapshot = snapshot; |
| options.snapshot_version = snapshot_version_; |
| options.exclude_begin_key = false; |
| options.exclude_end_key = false; |
| |
| int64_t last_start_version = std::numeric_limits<int64_t>::max(); |
| auto iter = |
| versioned::document_get_range<RowsetMetaCloudPB>(txn, start_key, end_key, options); |
| for (auto&& kvp = iter->next(); kvp.has_value(); kvp = iter->next()) { |
| auto&& [key, version, rowset_meta] = *kvp; |
| DCHECK(version < snapshot_version_) |
| << "version: " << version.to_string() |
| << ", snapshot_version: " << snapshot_version_.to_string(); |
| |
| int64_t start_version = rowset_meta.start_version(); |
| int64_t end_version = rowset_meta.end_version(); |
| if (last_start_version <= start_version) { |
| // This compact rowset has been covered by a large compact rowset |
| continue; |
| } |
| |
| min_read_versionstamp_ = std::min(min_read_versionstamp_, version); |
| last_start_version = start_version; |
| // erase the rowsets that are covered by this compact rowset |
| rowset_graph.erase(rowset_graph.lower_bound(start_version), |
| rowset_graph.upper_bound(end_version)); |
| rowset_graph.emplace(end_version, std::move(rowset_meta)); |
| } |
| if (!iter->is_valid()) { |
| LOG_ERROR("failed to get compacted rowset metas") |
| .tag("instance_id", instance_id_) |
| .tag("tablet_id", tablet_id) |
| .tag("start_version", start_version) |
| .tag("end_version", end_version) |
| .tag("error_code", iter->error_code()); |
| return iter->error_code(); |
| } |
| } |
| |
| rowset_metas->clear(); |
| rowset_metas->reserve(rowset_graph.size()); |
| for (auto&& [version, rowset_meta] : rowset_graph) { |
| rowset_metas->emplace_back(std::move(rowset_meta)); |
| } |
| |
| return TxnErrorCode::TXN_OK; |
| } |
| |
| TxnErrorCode MetaReader::get_load_rowset_meta(int64_t tablet_id, int64_t version, |
| RowsetMetaCloudPB* rowset_meta, |
| Versionstamp* versionstamp, bool snapshot) { |
| DCHECK(txn_kv_) << "TxnKv must be set before calling"; |
| if (!txn_kv_) { |
| return TxnErrorCode::TXN_INVALID_ARGUMENT; |
| } |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| return get_load_rowset_meta(txn.get(), tablet_id, version, rowset_meta, versionstamp, snapshot); |
| } |
| |
| TxnErrorCode MetaReader::get_load_rowset_meta(Transaction* txn, int64_t tablet_id, int64_t version, |
| RowsetMetaCloudPB* rowset_meta, |
| Versionstamp* versionstamp, bool snapshot) { |
| std::string load_rowset_key = |
| versioned::meta_rowset_load_key({instance_id_, tablet_id, version}); |
| TxnErrorCode err = versioned::document_get(txn, load_rowset_key, snapshot_version_, rowset_meta, |
| versionstamp, snapshot); |
| if (err == TxnErrorCode::TXN_OK) { |
| min_read_versionstamp_ = std::min(min_read_versionstamp_, *versionstamp); |
| } |
| return err; |
| } |
| |
| TxnErrorCode MetaReader::get_compact_rowset_meta(int64_t tablet_id, int64_t version, |
| RowsetMetaCloudPB* rowset_meta, |
| Versionstamp* versionstamp, bool snapshot) { |
| DCHECK(txn_kv_) << "TxnKv must be set before calling"; |
| if (!txn_kv_) { |
| return TxnErrorCode::TXN_INVALID_ARGUMENT; |
| } |
| |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| return get_compact_rowset_meta(txn.get(), tablet_id, version, rowset_meta, versionstamp, |
| snapshot); |
| } |
| |
| TxnErrorCode MetaReader::get_compact_rowset_meta(Transaction* txn, int64_t tablet_id, |
| int64_t version, RowsetMetaCloudPB* rowset_meta, |
| Versionstamp* versionstamp, bool snapshot) { |
| std::string load_rowset_key = |
| versioned::meta_rowset_compact_key({instance_id_, tablet_id, version}); |
| TxnErrorCode err = versioned::document_get(txn, load_rowset_key, snapshot_version_, rowset_meta, |
| versionstamp, snapshot); |
| if (err == TxnErrorCode::TXN_OK) { |
| min_read_versionstamp_ = std::min(min_read_versionstamp_, *versionstamp); |
| } |
| return err; |
| } |
| |
| TxnErrorCode MetaReader::get_tablet_indexes( |
| const std::vector<int64_t>& tablet_ids, |
| std::unordered_map<int64_t, TabletIndexPB>* tablet_indexes, bool snapshot) { |
| DCHECK(txn_kv_) << "TxnKv must be set before calling"; |
| if (!txn_kv_) { |
| return TxnErrorCode::TXN_INVALID_ARGUMENT; |
| } |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| return get_tablet_indexes(txn.get(), tablet_ids, tablet_indexes, snapshot); |
| } |
| |
| TxnErrorCode MetaReader::get_tablet_indexes( |
| Transaction* txn, const std::vector<int64_t>& tablet_ids, |
| std::unordered_map<int64_t, TabletIndexPB>* tablet_indexes, bool snapshot) { |
| if (tablet_ids.empty()) { |
| return TxnErrorCode::TXN_OK; |
| } |
| |
| std::vector<std::string> index_keys; |
| for (size_t i = 0; i < tablet_ids.size(); ++i) { |
| int64_t tablet_id = tablet_ids[i]; |
| std::string tablet_index_key = versioned::tablet_index_key({instance_id_, tablet_id}); |
| index_keys.push_back(std::move(tablet_index_key)); |
| } |
| |
| std::vector<std::optional<std::string>> values; |
| Transaction::BatchGetOptions options; |
| options.snapshot = snapshot; |
| TxnErrorCode err = txn->batch_get(&values, index_keys, options); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| |
| for (size_t i = 0; i < values.size(); ++i) { |
| const auto& kv = values[i]; |
| if (!kv.has_value()) { |
| continue; // Key not found, skip |
| } |
| |
| const std::string& value = kv.value(); |
| int64_t tablet_id = tablet_ids[i]; |
| |
| TabletIndexPB tablet_index; |
| if (!tablet_index.ParseFromString(value)) { |
| LOG_ERROR("Failed to parse TabletIndexPB") |
| .tag("instance_id", instance_id_) |
| .tag("tablet_id", tablet_id) |
| .tag("key", hex(index_keys[i])) |
| .tag("value", hex(value)); |
| return TxnErrorCode::TXN_INVALID_DATA; |
| } |
| tablet_indexes->emplace(tablet_id, std::move(tablet_index)); |
| } |
| |
| return TxnErrorCode::TXN_OK; |
| } |
| |
| TxnErrorCode MetaReader::get_partition_pending_txn_id(int64_t partition_id, int64_t* first_txn_id, |
| int64_t* partition_version, bool snapshot) { |
| DCHECK(txn_kv_) << "TxnKv must be set before calling"; |
| if (!txn_kv_) { |
| return TxnErrorCode::TXN_INVALID_ARGUMENT; |
| } |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| return get_partition_pending_txn_id(txn.get(), partition_id, first_txn_id, partition_version, |
| snapshot); |
| } |
| |
| TxnErrorCode MetaReader::get_partition_pending_txn_id(Transaction* txn, int64_t partition_id, |
| int64_t* first_txn_id, |
| int64_t* partition_version, bool snapshot) { |
| // Initialize to -1 to indicate no pending transactions |
| *first_txn_id = -1; |
| |
| VersionPB version_pb; |
| Versionstamp versionstamp; |
| TxnErrorCode err = |
| get_partition_version(txn, partition_id, &version_pb, &versionstamp, snapshot); |
| if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| // No version found, no pending transactions |
| return TxnErrorCode::TXN_OK; |
| } else if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| |
| min_read_versionstamp_ = std::min(min_read_versionstamp_, versionstamp); |
| if (version_pb.pending_txn_ids_size() > 0) { |
| *first_txn_id = version_pb.pending_txn_ids(0); |
| } |
| *partition_version = version_pb.version(); |
| |
| return TxnErrorCode::TXN_OK; |
| } |
| |
| TxnErrorCode MetaReader::get_index_index(int64_t index_id, IndexIndexPB* index, bool snapshot) { |
| DCHECK(txn_kv_) << "TxnKv must be set before calling"; |
| if (!txn_kv_) { |
| return TxnErrorCode::TXN_INVALID_ARGUMENT; |
| } |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| return get_index_index(txn.get(), index_id, index, snapshot); |
| } |
| |
| TxnErrorCode MetaReader::get_index_index(Transaction* txn, int64_t index_id, IndexIndexPB* index, |
| bool snapshot) { |
| std::string index_index_key = versioned::index_index_key({instance_id_, index_id}); |
| std::string value; |
| TxnErrorCode err = txn->get(index_index_key, &value, snapshot); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| |
| if (index && !index->ParseFromString(value)) { |
| LOG_ERROR("Failed to parse IndexIndexPB") |
| .tag("instance_id", instance_id_) |
| .tag("index_id", index_id) |
| .tag("key", hex(index_index_key)); |
| return TxnErrorCode::TXN_INVALID_DATA; |
| } |
| |
| return TxnErrorCode::TXN_OK; |
| } |
| |
| TxnErrorCode MetaReader::get_partition_index(int64_t partition_id, |
| PartitionIndexPB* partition_index, bool snapshot) { |
| DCHECK(txn_kv_) << "TxnKv must be set before calling"; |
| if (!txn_kv_) { |
| return TxnErrorCode::TXN_INVALID_ARGUMENT; |
| } |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| return get_partition_index(txn.get(), partition_id, partition_index, snapshot); |
| } |
| |
| TxnErrorCode MetaReader::get_partition_index(Transaction* txn, int64_t partition_id, |
| PartitionIndexPB* partition_index, bool snapshot) { |
| std::string partition_index_key = versioned::partition_index_key({instance_id_, partition_id}); |
| std::string value; |
| TxnErrorCode err = txn->get(partition_index_key, &value, snapshot); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| |
| if (partition_index && !partition_index->ParseFromString(value)) { |
| LOG_ERROR("Failed to parse PartitionIndexPB") |
| .tag("instance_id", instance_id_) |
| .tag("partition_id", partition_id) |
| .tag("key", hex(partition_index_key)); |
| return TxnErrorCode::TXN_INVALID_DATA; |
| } |
| |
| return TxnErrorCode::TXN_OK; |
| } |
| |
| TxnErrorCode MetaReader::is_index_exists(int64_t index_id, bool snapshot) { |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| return is_index_exists(txn.get(), index_id, snapshot); |
| } |
| |
| TxnErrorCode MetaReader::is_index_exists(Transaction* txn, int64_t index_id, bool snapshot) { |
| std::string key = versioned::meta_index_key({instance_id_, index_id}); |
| std::string value; |
| Versionstamp key_version; |
| TxnErrorCode err = versioned_get(txn, key, snapshot_version_, &key_version, &value, snapshot); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| min_read_versionstamp_ = std::min(min_read_versionstamp_, key_version); |
| return TxnErrorCode::TXN_OK; |
| } |
| |
| TxnErrorCode MetaReader::is_partition_exists(int64_t partition_id, bool snapshot) { |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| return is_partition_exists(txn.get(), partition_id, snapshot); |
| } |
| |
| TxnErrorCode MetaReader::is_partition_exists(Transaction* txn, int64_t partition_id, |
| bool snapshot) { |
| std::string key = versioned::meta_partition_key({instance_id_, partition_id}); |
| std::string value; |
| Versionstamp key_version; |
| TxnErrorCode err = versioned_get(txn, key, snapshot_version_, &key_version, &value, snapshot); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| min_read_versionstamp_ = std::min(min_read_versionstamp_, key_version); |
| return TxnErrorCode::TXN_OK; |
| } |
| |
| TxnErrorCode MetaReader::get_snapshots( |
| Transaction* txn, std::vector<std::pair<SnapshotPB, Versionstamp>>* snapshots) { |
| std::string snapshot_key = versioned::snapshot_full_key({instance_id_}); |
| std::string snapshot_start_key = encode_versioned_key(snapshot_key, Versionstamp::min()); |
| std::string snapshot_end_key = encode_versioned_key(snapshot_key, Versionstamp::max()); |
| |
| FullRangeGetOptions range_options; |
| range_options.prefetch = true; |
| auto it = txn->full_range_get(snapshot_start_key, snapshot_end_key, range_options); |
| for (auto&& kvp = it->next(); kvp.has_value(); kvp = it->next()) { |
| auto&& [key, snapshot_value] = *kvp; |
| |
| Versionstamp version; |
| std::string_view key_view(key); |
| if (decode_tailing_versionstamp_end(&key_view) || |
| decode_tailing_versionstamp(&key_view, &version)) { |
| LOG_WARNING("failed to decode versionstamp from snapshot full key") |
| .tag("instance_id", instance_id_) |
| .tag("key", hex(key)); |
| return TxnErrorCode::TXN_INVALID_DATA; |
| } |
| |
| SnapshotPB snapshot; |
| if (!snapshot.ParseFromArray(snapshot_value.data(), snapshot_value.size())) { |
| LOG_ERROR("Failed to parse SnapshotPB") |
| .tag("instance_id", instance_id_) |
| .tag("key", hex(key)); |
| return TxnErrorCode::TXN_INVALID_DATA; |
| } |
| |
| snapshots->emplace_back(std::move(snapshot), version); |
| } |
| |
| if (!it->is_valid()) { |
| LOG_ERROR("failed to get snapshots") |
| .tag("instance_id", instance_id_) |
| .tag("error_code", it->error_code()); |
| return it->error_code(); |
| } |
| |
| return TxnErrorCode::TXN_OK; |
| } |
| |
| TxnErrorCode MetaReader::get_snapshots( |
| std::vector<std::pair<SnapshotPB, Versionstamp>>* snapshots) { |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| return get_snapshots(txn.get(), snapshots); |
| } |
| |
| TxnErrorCode MetaReader::get_snapshot(Transaction* txn, Versionstamp snapshot_versionstamp, |
| SnapshotPB* snapshot_pb, bool snapshot) { |
| std::string snapshot_key = versioned::snapshot_full_key({instance_id_}); |
| std::string snapshot_full_key = encode_versioned_key(snapshot_key, snapshot_versionstamp); |
| |
| std::string value; |
| TxnErrorCode err = txn->get(snapshot_full_key, &value, snapshot); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| |
| if (snapshot_pb && !snapshot_pb->ParseFromString(value)) { |
| LOG_ERROR("Failed to parse SnapshotPB") |
| .tag("instance_id", instance_id_) |
| .tag("snapshot_versionstamp", snapshot_versionstamp.to_string()) |
| .tag("key", hex(snapshot_full_key)); |
| return TxnErrorCode::TXN_INVALID_DATA; |
| } |
| |
| return TxnErrorCode::TXN_OK; |
| } |
| |
| TxnErrorCode MetaReader::has_snapshot(bool* has, bool snapshot) { |
| DCHECK(txn_kv_) << "TxnKv must be set before calling"; |
| if (!txn_kv_) { |
| return TxnErrorCode::TXN_INVALID_ARGUMENT; |
| } |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| return has_snapshot(txn.get(), has, snapshot); |
| } |
| |
| TxnErrorCode MetaReader::has_snapshot(Transaction* txn, bool* has, bool snapshot) { |
| std::string snapshot_key = versioned::snapshot_full_key({instance_id_}); |
| std::string snapshot_full_key = encode_versioned_key(snapshot_key, Versionstamp::max()); |
| |
| std::unique_ptr<RangeGetIterator> it; |
| TxnErrorCode err = txn->get(snapshot_key, snapshot_full_key, &it, snapshot, 1); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| *has = it->has_next(); |
| return TxnErrorCode::TXN_OK; |
| } |
| |
| TxnErrorCode MetaReader::has_snapshot_references(Versionstamp snapshot_version, |
| bool* has_references, bool snapshot) { |
| DCHECK(txn_kv_) << "TxnKv must be set before calling"; |
| if (!txn_kv_) { |
| return TxnErrorCode::TXN_INVALID_ARGUMENT; |
| } |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| return has_snapshot_references(txn.get(), snapshot_version, has_references, snapshot); |
| } |
| |
| TxnErrorCode MetaReader::has_snapshot_references(Transaction* txn, Versionstamp snapshot_version, |
| bool* has_references, bool snapshot) { |
| std::string snapshot_ref_key_start = |
| versioned::snapshot_reference_key_prefix(instance_id_, snapshot_version); |
| std::string snapshot_ref_key_end = snapshot_ref_key_start + '\xFF'; |
| |
| std::unique_ptr<RangeGetIterator> it; |
| TxnErrorCode err = txn->get(snapshot_ref_key_start, snapshot_ref_key_end, &it, snapshot, 1); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| *has_references = it->has_next(); |
| return TxnErrorCode::TXN_OK; |
| } |
| |
| int MetaReader::count_snapshot_references(Transaction* txn, Versionstamp snapshot_version, |
| bool snapshot) { |
| std::string snapshot_ref_key_start = |
| versioned::snapshot_reference_key_prefix(instance_id_, snapshot_version); |
| std::string snapshot_ref_key_end = snapshot_ref_key_start + '\xFF'; |
| |
| std::unique_ptr<RangeGetIterator> it; |
| TxnErrorCode err = txn->get(snapshot_ref_key_start, snapshot_ref_key_end, &it, snapshot, 100); |
| if (err != TxnErrorCode::TXN_OK) { |
| LOG(WARNING) << "failed to get snapshot references for counting, snapshot_version=" |
| << snapshot_version.to_string(); |
| return 0; |
| } |
| |
| int count = 0; |
| while (it->has_next()) { |
| [[maybe_unused]] auto [key, value] = it->next(); |
| count++; |
| } |
| |
| return count; |
| } |
| |
| TxnErrorCode MetaReader::find_derived_instance_ids(Transaction* txn, Versionstamp snapshot_version, |
| std::vector<std::string>* out, bool snapshot) { |
| // Key format: ${prefix} + ${10-byte-versionstamp} + ${derived_instance_id} |
| std::string snapshot_ref_key_start = |
| versioned::snapshot_reference_key_prefix(instance_id_, snapshot_version); |
| std::string snapshot_ref_key_end = snapshot_ref_key_start + '\xFF'; |
| |
| std::unique_ptr<RangeGetIterator> it; |
| TxnErrorCode err = txn->get(snapshot_ref_key_start, snapshot_ref_key_end, &it, snapshot); |
| if (err != TxnErrorCode::TXN_OK) { |
| LOG(WARNING) << "failed to get snapshot references, snapshot_version=" |
| << snapshot_version.to_string() << " err=" << err; |
| return err; |
| } |
| |
| // Parse instance IDs from keys |
| std::unordered_set<std::string> unique_ids; |
| while (it->has_next()) { |
| auto [key, value] = it->next(); |
| |
| // Decode the snapshot reference key to extract ref_instance_id |
| std::string ref_instance_id; |
| std::string_view key_view = key; |
| if (versioned::decode_snapshot_ref_key(&key_view, nullptr, nullptr, &ref_instance_id) && |
| !ref_instance_id.empty()) { |
| unique_ids.insert(std::move(ref_instance_id)); |
| } else { |
| LOG(WARNING) << "failed to decode snapshot reference key, key=" << hex(key); |
| } |
| } |
| |
| // Convert set to vector |
| out->assign(unique_ids.begin(), unique_ids.end()); |
| |
| return TxnErrorCode::TXN_OK; |
| } |
| |
| TxnErrorCode MetaReader::get_load_rowset_metas( |
| int64_t tablet_id, std::vector<std::pair<RowsetMetaCloudPB, Versionstamp>>* rowset_metas, |
| bool snapshot) { |
| DCHECK(txn_kv_) << "TxnKv must be set before calling"; |
| if (!txn_kv_) { |
| return TxnErrorCode::TXN_INVALID_ARGUMENT; |
| } |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| return get_load_rowset_metas(txn.get(), tablet_id, rowset_metas, snapshot); |
| } |
| |
| TxnErrorCode MetaReader::get_load_rowset_metas( |
| Transaction* txn, int64_t tablet_id, |
| std::vector<std::pair<RowsetMetaCloudPB, Versionstamp>>* rowset_metas, bool snapshot) { |
| std::string start_key = versioned::meta_rowset_load_key({instance_id_, tablet_id, 0}); |
| std::string end_key = versioned::meta_rowset_load_key( |
| {instance_id_, tablet_id, std::numeric_limits<int64_t>::max()}); |
| |
| versioned::ReadDocumentMessagesOptions options; |
| options.snapshot = snapshot; |
| options.snapshot_version = snapshot_version_; |
| options.exclude_begin_key = false; |
| options.exclude_end_key = false; |
| |
| auto iter = versioned::document_get_range<RowsetMetaCloudPB>(txn, start_key, end_key, options); |
| for (auto&& kvp = iter->next(); kvp.has_value(); kvp = iter->next()) { |
| auto&& [key, version, rowset_meta] = *kvp; |
| rowset_metas->emplace_back(std::move(rowset_meta), version); |
| min_read_versionstamp_ = std::min(min_read_versionstamp_, version); |
| DCHECK(version < snapshot_version_) |
| << "version: " << version.to_string() |
| << ", snapshot_version: " << snapshot_version_.to_string(); |
| } |
| |
| if (!iter->is_valid()) { |
| LOG_ERROR("failed to get load rowset metas") |
| .tag("instance_id", instance_id_) |
| .tag("tablet_id", tablet_id) |
| .tag("error_code", iter->error_code()); |
| return iter->error_code(); |
| } |
| |
| return TxnErrorCode::TXN_OK; |
| } |
| |
| TxnErrorCode MetaReader::get_compact_rowset_metas( |
| int64_t tablet_id, std::vector<std::pair<RowsetMetaCloudPB, Versionstamp>>* rowset_metas, |
| bool snapshot) { |
| DCHECK(txn_kv_) << "TxnKv must be set before calling"; |
| if (!txn_kv_) { |
| return TxnErrorCode::TXN_INVALID_ARGUMENT; |
| } |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| return get_compact_rowset_metas(txn.get(), tablet_id, rowset_metas, snapshot); |
| } |
| |
| TxnErrorCode MetaReader::get_compact_rowset_metas( |
| Transaction* txn, int64_t tablet_id, |
| std::vector<std::pair<RowsetMetaCloudPB, Versionstamp>>* rowset_metas, bool snapshot) { |
| std::string start_key = versioned::meta_rowset_compact_key({instance_id_, tablet_id, 0}); |
| std::string end_key = versioned::meta_rowset_compact_key( |
| {instance_id_, tablet_id, std::numeric_limits<int64_t>::max()}); |
| |
| versioned::ReadDocumentMessagesOptions options; |
| options.snapshot = snapshot; |
| options.snapshot_version = snapshot_version_; |
| options.exclude_begin_key = false; |
| options.exclude_end_key = false; |
| |
| auto iter = versioned::document_get_range<RowsetMetaCloudPB>(txn, start_key, end_key, options); |
| for (auto&& kvp = iter->next(); kvp.has_value(); kvp = iter->next()) { |
| auto&& [key, version, rowset_meta] = *kvp; |
| rowset_metas->emplace_back(std::move(rowset_meta), version); |
| min_read_versionstamp_ = std::min(min_read_versionstamp_, version); |
| DCHECK(version < snapshot_version_) |
| << "version: " << version.to_string() |
| << ", snapshot_version: " << snapshot_version_.to_string(); |
| } |
| |
| if (!iter->is_valid()) { |
| LOG_ERROR("failed to get compact rowset metas") |
| .tag("instance_id", instance_id_) |
| .tag("tablet_id", tablet_id) |
| .tag("error_code", iter->error_code()); |
| return iter->error_code(); |
| } |
| |
| return TxnErrorCode::TXN_OK; |
| } |
| |
| TxnErrorCode MetaReader::has_no_indexes(int64_t db_id, int64_t table_id, bool* no_indexes, |
| bool snapshot) { |
| DCHECK(txn_kv_) << "TxnKv must be set before calling"; |
| if (!txn_kv_) { |
| return TxnErrorCode::TXN_INVALID_ARGUMENT; |
| } |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| return has_no_indexes(txn.get(), db_id, table_id, no_indexes, snapshot); |
| } |
| |
| TxnErrorCode MetaReader::has_no_indexes(Transaction* txn, int64_t db_id, int64_t table_id, |
| bool* no_indexes, bool snapshot) { |
| std::string start_key = versioned::index_inverted_key({instance_id_, db_id, table_id, 0}); |
| std::string end_key = versioned::index_inverted_key( |
| {instance_id_, db_id, table_id, std::numeric_limits<int64_t>::max()}); |
| |
| FullRangeGetOptions options; |
| options.prefetch = false; |
| options.exact_limit = 1; // We only need to know if there is at least one index |
| options.snapshot = snapshot; |
| auto it = txn->full_range_get(start_key, end_key, options); |
| if (it->has_next()) { |
| *no_indexes = false; |
| return TxnErrorCode::TXN_OK; |
| } else if (it->is_valid()) { |
| *no_indexes = true; |
| return TxnErrorCode::TXN_OK; |
| } else { |
| return it->error_code(); |
| } |
| } |
| |
| } // namespace doris::cloud |