| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "meta-service/meta_service_tablet_stats.h" |
| |
| #include <fmt/core.h> |
| #include <fmt/format.h> |
| #include <gen_cpp/cloud.pb.h> |
| |
| #include <cstdint> |
| #include <memory> |
| #include <string> |
| #include <string_view> |
| |
| #include "common/logging.h" |
| #include "common/util.h" |
| #include "meta-service/meta_service.h" |
| #include "meta-service/meta_service_helper.h" |
| #include "meta-store/clone_chain_reader.h" |
| #include "meta-store/keys.h" |
| #include "meta-store/meta_reader.h" |
| #include "meta-store/txn_kv.h" |
| #include "meta-store/txn_kv_error.h" |
| #include "meta-store/versioned_value.h" |
| |
| namespace doris::cloud { |
| |
| void internal_get_tablet_stats(MetaServiceCode& code, std::string& msg, Transaction* txn, |
| const std::string& instance_id, const TabletIndexPB& idx, |
| TabletStatsPB& stats, TabletStats& detached_stats, bool snapshot) { |
| // clang-format off |
| auto begin_key = stats_tablet_key({instance_id, idx.table_id(), idx.index_id(), idx.partition_id(), idx.tablet_id()}); |
| auto begin_key_check = begin_key; |
| auto end_key = stats_tablet_key({instance_id, idx.table_id(), idx.index_id(), idx.partition_id(), idx.tablet_id() + 1}); |
| // clang-format on |
| std::vector<std::pair<std::string, std::string>> stats_kvs; |
| stats_kvs.reserve( |
| 7); // aggregate + data_size + num_rows + num_rowsets + num_segments + index_size + segment_size |
| |
| std::unique_ptr<RangeGetIterator> it; |
| do { |
| TxnErrorCode err = txn->get(begin_key, end_key, &it, snapshot); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to get tablet stats, err={} tablet_id={}", err, |
| idx.tablet_id()); |
| return; |
| } |
| while (it->has_next()) { |
| auto [k, v] = it->next(); |
| stats_kvs.emplace_back(std::string {k.data(), k.size()}, |
| std::string {v.data(), v.size()}); |
| } |
| begin_key = it->next_begin_key(); |
| } while (it->more()); |
| |
| if (stats_kvs.empty()) { |
| code = MetaServiceCode::TABLET_NOT_FOUND; |
| msg = fmt::format("tablet stats not found, tablet_id={}", idx.tablet_id()); |
| return; |
| } |
| |
| auto& [first_stats_key, v] = stats_kvs[0]; |
| // First key MUST be tablet stats key, the original non-detached one |
| DCHECK(first_stats_key == begin_key_check) |
| << hex(first_stats_key) << " vs " << hex(begin_key_check); |
| if (!stats.ParseFromArray(v.data(), v.size())) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = fmt::format("marformed tablet stats value, key={}", hex(first_stats_key)); |
| return; |
| } |
| // Parse split tablet stats |
| int ret = get_detached_tablet_stats(stats_kvs, detached_stats); |
| |
| if (ret != 0) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = fmt::format("marformed splitted tablet stats kv, key={}", hex(first_stats_key)); |
| return; |
| } |
| } |
| |
| int get_detached_tablet_stats(const std::vector<std::pair<std::string, std::string>>& stats_kvs, |
| TabletStats& detached_stats) { |
| bool unexpected_size = false; |
| // clang-format off |
| if (stats_kvs.size() != 7 // aggregated stats and 6 splitted stats: num_rowsets num_segs data_size num_rows index_size segment_size |
| && stats_kvs.size() != 5 // aggregated stats and 4 splitted stats: num_rowsets num_segs data_size num_rows |
| && stats_kvs.size() != 2 // aggregated stats and 1 splitted stats: num_rowsets |
| && stats_kvs.size() != 1 // aggregated stats only (nothing has been imported since created) |
| ) { |
| unexpected_size = true; |
| } |
| // clang-format on |
| std::stringstream ss; |
| for (size_t i = 1; i < stats_kvs.size(); ++i) { |
| std::string_view k(stats_kvs[i].first), v(stats_kvs[i].second); |
| k.remove_prefix(1); |
| constexpr size_t key_parts = 8; |
| std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out; |
| if (decode_key(&k, &out) != 0 || out.size() != key_parts) [[unlikely]] { |
| LOG(WARNING) << "malformed tablet stats key. key=" << hex(k); |
| return -1; |
| } |
| |
| auto* suffix = std::get_if<std::string>(&std::get<0>(out.back())); |
| if (!suffix) [[unlikely]] { |
| LOG(WARNING) << "malformed tablet stats key. key=" << hex(k); |
| return -2; |
| } |
| |
| int64_t val = 0; |
| if (v.size() != sizeof(val)) [[unlikely]] { |
| LOG(WARNING) << "malformed tablet stats value v.size=" << v.size() << " key=" << hex(k); |
| return -3; |
| } |
| std::memcpy(&val, v.data(), sizeof(val)); |
| if constexpr (std::endian::native == std::endian::big) { |
| val = bswap_64(val); |
| } |
| |
| if (unexpected_size) ss << *suffix << " "; |
| |
| if (*suffix == STATS_KEY_SUFFIX_DATA_SIZE) { |
| detached_stats.data_size = val; |
| } else if (*suffix == STATS_KEY_SUFFIX_NUM_ROWS) { |
| detached_stats.num_rows = val; |
| } else if (*suffix == STATS_KEY_SUFFIX_NUM_ROWSETS) { |
| detached_stats.num_rowsets = val; |
| } else if (*suffix == STATS_KEY_SUFFIX_NUM_SEGS) { |
| detached_stats.num_segs = val; |
| } else if (*suffix == STATS_KEY_SUFFIX_INDEX_SIZE) { |
| detached_stats.index_size = val; |
| } else if (*suffix == STATS_KEY_SUFFIX_SEGMENT_SIZE) { |
| detached_stats.segment_size = val; |
| } else { |
| LOG(WARNING) << "unknown tablet stats suffix=" << *suffix << " key=" << hex(k); |
| } |
| } |
| |
| if (unexpected_size) { |
| DCHECK(false) << "unexpected tablet stats_kvs, it should be 1 or 2 or 5 or 7, size=" |
| << stats_kvs.size() << " suffix=" << ss.str(); |
| LOG_EVERY_N(WARNING, 100) |
| << "unexpected tablet stats_kvs, it should be 1 or 2 or 5 or 7, size=" |
| << stats_kvs.size() << " suffix=" << ss.str(); |
| } |
| |
| return 0; |
| } |
| |
| void merge_tablet_stats(TabletStatsPB& stats, const TabletStats& detached_stats) { |
| stats.set_data_size(stats.data_size() + detached_stats.data_size); |
| stats.set_num_rows(stats.num_rows() + detached_stats.num_rows); |
| stats.set_num_rowsets(stats.num_rowsets() + detached_stats.num_rowsets); |
| stats.set_num_segments(stats.num_segments() + detached_stats.num_segs); |
| stats.set_index_size(stats.index_size() + detached_stats.index_size); |
| stats.set_segment_size(stats.segment_size() + detached_stats.segment_size); |
| } |
| |
| void detach_tablet_stats(const TabletStatsPB& stats, TabletStats& detached_stats) { |
| detached_stats.data_size = stats.data_size(); |
| detached_stats.num_rows = stats.num_rows(); |
| detached_stats.num_rowsets = stats.num_rowsets(); |
| detached_stats.num_segs = stats.num_segments(); |
| detached_stats.index_size = stats.index_size(); |
| detached_stats.segment_size = stats.segment_size(); |
| } |
| |
| void internal_get_tablet_stats(MetaServiceCode& code, std::string& msg, Transaction* txn, |
| const std::string& instance_id, const TabletIndexPB& idx, |
| TabletStatsPB& stats, bool snapshot) { |
| TabletStats detached_stats; |
| internal_get_tablet_stats(code, msg, txn, instance_id, idx, stats, detached_stats, snapshot); |
| if (code == MetaServiceCode::OK) { |
| merge_tablet_stats(stats, detached_stats); |
| } |
| } |
| |
| void internal_get_load_tablet_stats(MetaServiceCode& code, std::string& msg, |
| CloneChainReader& meta_reader, Transaction* txn, |
| const std::string& instance_id, const TabletIndexPB& tablet_idx, |
| TabletStatsPB& stats, bool snapshot) { |
| int64_t tablet_id = tablet_idx.tablet_id(); |
| Versionstamp versionstamp; |
| |
| // Try to read existing versioned tablet stats |
| TxnErrorCode err = |
| meta_reader.get_tablet_load_stats(txn, tablet_id, &stats, &versionstamp, snapshot); |
| if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| // If versioned stats doesn't exist, read from single version |
| TabletStatsPB compact_stats; |
| TabletStats detached_stats; |
| internal_get_tablet_stats(code, msg, txn, instance_id, tablet_idx, compact_stats, |
| detached_stats, snapshot); |
| if (code == MetaServiceCode::OK) { |
| // Only the detached stats are valid for load tablet stats. |
| merge_tablet_stats(stats, detached_stats); |
| } |
| } else if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to get versioned tablet stats, err={}", err); |
| LOG(WARNING) << msg << " tablet_id=" << tablet_id; |
| return; |
| } |
| } |
| |
| void internal_get_load_tablet_stats_batch( |
| MetaServiceCode& code, std::string& msg, CloneChainReader& meta_reader, Transaction* txn, |
| const std::string& instance_id, |
| const std::unordered_map<int64_t, TabletIndexPB>& tablet_indexes, |
| std::unordered_map<int64_t, TabletStatsPB>* tablet_stats, bool snapshot) { |
| if (tablet_indexes.empty()) { |
| return; |
| } |
| |
| // Collect all tablet_ids |
| std::vector<int64_t> tablet_ids; |
| tablet_ids.reserve(tablet_indexes.size()); |
| for (const auto& [tablet_id, _] : tablet_indexes) { |
| tablet_ids.push_back(tablet_id); |
| } |
| |
| // Batch get versioned tablet load stats |
| std::unordered_map<int64_t, TabletStatsPB> versioned_stats; |
| std::unordered_map<int64_t, Versionstamp> versionstamps; |
| TxnErrorCode err = meta_reader.get_tablet_load_stats(txn, tablet_ids, &versioned_stats, |
| &versionstamps, snapshot); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to batch get versioned tablet stats, err={}", err); |
| LOG(WARNING) << msg; |
| return; |
| } |
| |
| // For tablets found in versioned stats, add them to output |
| for (const auto& [tablet_id, stats] : versioned_stats) { |
| (*tablet_stats)[tablet_id] = stats; |
| } |
| |
| // For tablets not found in versioned stats, fall back to single version detached stats |
| std::vector<int64_t> fallback_tablet_ids; |
| for (int64_t tablet_id : tablet_ids) { |
| if (!versioned_stats.contains(tablet_id)) { |
| fallback_tablet_ids.push_back(tablet_id); |
| } |
| } |
| |
| if (!fallback_tablet_ids.empty()) { |
| LOG(INFO) << "fallback to single version detached stats for " << fallback_tablet_ids.size() |
| << " tablets"; |
| |
| // Fall back to single version for each tablet |
| for (int64_t tablet_id : fallback_tablet_ids) { |
| auto it = tablet_indexes.find(tablet_id); |
| if (it == tablet_indexes.end()) { |
| continue; |
| } |
| const TabletIndexPB& tablet_idx = it->second; |
| |
| TabletStatsPB compact_stats; |
| TabletStats detached_stats; |
| internal_get_tablet_stats(code, msg, txn, instance_id, tablet_idx, compact_stats, |
| detached_stats, snapshot); |
| if (code != MetaServiceCode::OK) { |
| LOG(WARNING) << "failed to get detached tablet stats for fallback, tablet_id=" |
| << tablet_id << " code=" << code << " msg=" << msg; |
| return; |
| } |
| |
| // Only the detached stats are valid for load tablet stats. |
| TabletStatsPB stats_pb; |
| merge_tablet_stats(stats_pb, detached_stats); |
| (*tablet_stats)[tablet_id] = std::move(stats_pb); |
| } |
| } |
| } |
| |
| void internal_get_load_tablet_stats_batch(MetaServiceCode& code, std::string& msg, |
| CloneChainReader& meta_reader, Transaction* txn, |
| const std::string& instance_id, |
| const std::map<int64_t, TabletIndexPB>& tablet_indexes, |
| std::unordered_map<int64_t, TabletStatsPB>* tablet_stats, |
| bool snapshot) { |
| // Convert std::map to std::unordered_map and delegate to the main implementation |
| std::unordered_map<int64_t, TabletIndexPB> unordered_indexes(tablet_indexes.begin(), |
| tablet_indexes.end()); |
| internal_get_load_tablet_stats_batch(code, msg, meta_reader, txn, instance_id, |
| unordered_indexes, tablet_stats, snapshot); |
| } |
| |
| MetaServiceResponseStatus parse_fix_tablet_stats_param( |
| std::shared_ptr<ResourceManager> resource_mgr, const std::string& table_id_str, |
| const std::string& cloud_unique_id_str, const std::string& tablet_id_str, int64_t& table_id, |
| std::string& instance_id, int64_t& tablet_id) { |
| MetaServiceCode code = MetaServiceCode::OK; |
| std::string msg; |
| MetaServiceResponseStatus st; |
| st.set_code(MetaServiceCode::OK); |
| |
| // parse params |
| try { |
| table_id = std::stoll(table_id_str); |
| } catch (...) { |
| st.set_code(MetaServiceCode::INVALID_ARGUMENT); |
| st.set_msg("Invalid table_id, table_id: " + table_id_str); |
| return st; |
| } |
| |
| if (!tablet_id_str.empty()) { |
| try { |
| tablet_id = std::stoll(tablet_id_str); |
| } catch (...) { |
| st.set_code(MetaServiceCode::INVALID_ARGUMENT); |
| st.set_msg("Invalid tablet_id, tablet_id: " + tablet_id_str); |
| return st; |
| } |
| } |
| |
| instance_id = get_instance_id(resource_mgr, cloud_unique_id_str); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "empty instance_id"; |
| LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id_str; |
| st.set_code(code); |
| st.set_msg(msg); |
| return st; |
| } |
| return st; |
| } |
| |
| MetaServiceResponseStatus fix_tablet_stats_internal( |
| std::shared_ptr<TxnKv> txn_kv, std::pair<std::string, std::string>& key_pair, |
| std::vector<std::shared_ptr<TabletStatsPB>>& tablet_stat_shared_ptr_vec_batch, |
| const std::string& instance_id, size_t batch_size) { |
| std::unique_ptr<Transaction> txn; |
| MetaServiceResponseStatus st; |
| st.set_code(MetaServiceCode::OK); |
| MetaServiceCode code = MetaServiceCode::OK; |
| std::unique_ptr<RangeGetIterator> it; |
| std::vector<std::shared_ptr<TabletStatsPB>> tmp_tablet_stat_vec; |
| |
| TxnErrorCode err = txn_kv->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| st.set_code(cast_as<ErrCategory::CREATE>(err)); |
| st.set_msg("failed to create txn"); |
| return st; |
| } |
| |
| // read tablet stats |
| err = txn->get(key_pair.first, key_pair.second, &it, true); |
| if (err != TxnErrorCode::TXN_OK) { |
| st.set_code(cast_as<ErrCategory::READ>(err)); |
| st.set_msg(fmt::format("failed to get tablet stats, err={} ", err)); |
| return st; |
| } |
| |
| size_t tablet_cnt = 0; |
| while (it->has_next() && tablet_cnt < batch_size) { |
| auto [k, v] = it->next(); |
| key_pair.first = k; |
| auto k1 = k; |
| k1.remove_prefix(1); |
| std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out; |
| decode_key(&k1, &out); |
| |
| // 0x01 "stats" ${instance_id} "tablet" ${table_id} ${index_id} ${partition_id} ${tablet_id} -> TabletStatsPB |
| if (out.size() == 7) { |
| tablet_cnt++; |
| TabletStatsPB tablet_stat; |
| tablet_stat.ParseFromArray(v.data(), v.size()); |
| tmp_tablet_stat_vec.emplace_back(std::make_shared<TabletStatsPB>(tablet_stat)); |
| } |
| } |
| if (it->has_next()) { |
| key_pair.first = it->next().first; |
| } |
| |
| for (const auto& tablet_stat_ptr : tmp_tablet_stat_vec) { |
| GetRowsetResponse resp; |
| std::string msg; |
| // get rowsets in tablet and accumulate disk size |
| internal_get_rowset(txn.get(), 0, std::numeric_limits<int64_t>::max() - 1, instance_id, |
| tablet_stat_ptr->idx().tablet_id(), code, msg, &resp); |
| if (code != MetaServiceCode::OK) { |
| st.set_code(code); |
| st.set_msg(msg); |
| return st; |
| } |
| int64_t total_disk_size = 0; |
| int64_t index_disk_size = 0; |
| int64_t data_disk_size = 0; |
| for (const auto& rs_meta : resp.rowset_meta()) { |
| total_disk_size += rs_meta.total_disk_size(); |
| index_disk_size += rs_meta.index_disk_size(); |
| data_disk_size += rs_meta.data_disk_size(); |
| } |
| |
| // set new disk size to tabletPB and write it back |
| TabletStatsPB tablet_stat; |
| tablet_stat.CopyFrom(*tablet_stat_ptr); |
| tablet_stat.set_data_size(total_disk_size); |
| tablet_stat.set_index_size(index_disk_size); |
| tablet_stat.set_segment_size(data_disk_size); |
| // record tablet stats batch |
| tablet_stat_shared_ptr_vec_batch.emplace_back(std::make_shared<TabletStatsPB>(tablet_stat)); |
| std::string tablet_stat_key; |
| std::string tablet_stat_value; |
| tablet_stat_key = stats_tablet_key( |
| {instance_id, tablet_stat.idx().table_id(), tablet_stat.idx().index_id(), |
| tablet_stat.idx().partition_id(), tablet_stat.idx().tablet_id()}); |
| if (!tablet_stat.SerializeToString(&tablet_stat_value)) { |
| st.set_code(MetaServiceCode::PROTOBUF_SERIALIZE_ERR); |
| st.set_msg("failed to serialize tablet stat"); |
| return st; |
| } |
| txn->put(tablet_stat_key, tablet_stat_value); |
| |
| // read num segs |
| // 0x01 "stats" ${instance_id} "tablet" ${table_id} ${index_id} ${partition_id} ${tablet_id} "num_segs" -> int64 |
| std::string tablet_stat_num_segs_key; |
| stats_tablet_num_segs_key( |
| {instance_id, tablet_stat_ptr->idx().table_id(), tablet_stat_ptr->idx().index_id(), |
| tablet_stat_ptr->idx().partition_id(), tablet_stat_ptr->idx().tablet_id()}, |
| &tablet_stat_num_segs_key); |
| int64_t tablet_stat_num_segs = 0; |
| std::string tablet_stat_num_segs_value(sizeof(tablet_stat_num_segs), '\0'); |
| err = txn->get(tablet_stat_num_segs_key, &tablet_stat_num_segs_value); |
| if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| st.set_code(cast_as<ErrCategory::READ>(err)); |
| } |
| if (tablet_stat_num_segs_value.size() != sizeof(tablet_stat_num_segs)) [[unlikely]] { |
| LOG(WARNING) << " malformed tablet stats value v.size=" |
| << tablet_stat_num_segs_value.size() |
| << " value=" << hex(tablet_stat_num_segs_value); |
| } |
| std::memcpy(&tablet_stat_num_segs, tablet_stat_num_segs_value.data(), |
| sizeof(tablet_stat_num_segs)); |
| if constexpr (std::endian::native == std::endian::big) { |
| tablet_stat_num_segs = bswap_64(tablet_stat_num_segs); |
| } |
| |
| if (tablet_stat_num_segs > 0) { |
| // set tablet stats data size = 0 |
| // 0x01 "stats" ${instance_id} "tablet" ${table_id} ${index_id} ${partition_id} ${tablet_id} "data_size" -> int64 |
| std::string tablet_stat_data_size_key; |
| stats_tablet_data_size_key( |
| {instance_id, tablet_stat.idx().table_id(), tablet_stat.idx().index_id(), |
| tablet_stat.idx().partition_id(), tablet_stat.idx().tablet_id()}, |
| &tablet_stat_data_size_key); |
| int64_t tablet_stat_data_size = 0; |
| std::string tablet_stat_data_size_value(sizeof(tablet_stat_data_size), '\0'); |
| memcpy(tablet_stat_data_size_value.data(), &tablet_stat_data_size, |
| sizeof(tablet_stat_data_size)); |
| txn->put(tablet_stat_data_size_key, tablet_stat_data_size_value); |
| // set tablet stats index size = 0 |
| // 0x01 "stats" ${instance_id} "tablet" ${table_id} ${index_id} ${partition_id} ${tablet_id} "index_size" -> int64 |
| std::string tablet_stat_index_size_key; |
| stats_tablet_index_size_key( |
| {instance_id, tablet_stat.idx().table_id(), tablet_stat.idx().index_id(), |
| tablet_stat.idx().partition_id(), tablet_stat.idx().tablet_id()}, |
| &tablet_stat_index_size_key); |
| int64_t tablet_stat_index_size = 0; |
| std::string tablet_stat_index_size_value(sizeof(tablet_stat_index_size), '\0'); |
| memcpy(tablet_stat_index_size_value.data(), &tablet_stat_index_size, |
| sizeof(tablet_stat_index_size)); |
| txn->put(tablet_stat_index_size_key, tablet_stat_index_size_value); |
| // set tablet stats segment size = 0 |
| // 0x01 "stats" ${instance_id} "tablet" ${table_id} ${index_id} ${partition_id} ${tablet_id} "segment_size" -> int64 |
| std::string tablet_stat_segment_size_key; |
| stats_tablet_segment_size_key( |
| {instance_id, tablet_stat.idx().table_id(), tablet_stat.idx().index_id(), |
| tablet_stat.idx().partition_id(), tablet_stat.idx().tablet_id()}, |
| &tablet_stat_segment_size_key); |
| int64_t tablet_stat_segment_size = 0; |
| std::string tablet_stat_segment_size_value(sizeof(tablet_stat_segment_size), '\0'); |
| memcpy(tablet_stat_segment_size_value.data(), &tablet_stat_segment_size, |
| sizeof(tablet_stat_segment_size)); |
| txn->put(tablet_stat_segment_size_key, tablet_stat_segment_size_value); |
| } |
| } |
| |
| err = txn->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| st.set_code(cast_as<ErrCategory::COMMIT>(err)); |
| st.set_msg("failed to commit txn"); |
| return st; |
| } |
| return st; |
| } |
| |
| MetaServiceResponseStatus check_new_tablet_stats( |
| std::shared_ptr<TxnKv> txn_kv, const std::string& instance_id, |
| const std::vector<std::shared_ptr<TabletStatsPB>>& tablet_stat_shared_ptr_vec_batch) { |
| std::unique_ptr<Transaction> txn; |
| MetaServiceResponseStatus st; |
| st.set_code(MetaServiceCode::OK); |
| |
| TxnErrorCode err = txn_kv->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| st.set_code(cast_as<ErrCategory::CREATE>(err)); |
| st.set_msg("failed to create txn"); |
| return st; |
| } |
| |
| for (const auto& tablet_stat_ptr : tablet_stat_shared_ptr_vec_batch) { |
| // check tablet stats |
| std::string tablet_stat_key; |
| std::string tablet_stat_value; |
| tablet_stat_key = stats_tablet_key( |
| {instance_id, tablet_stat_ptr->idx().table_id(), tablet_stat_ptr->idx().index_id(), |
| tablet_stat_ptr->idx().partition_id(), tablet_stat_ptr->idx().tablet_id()}); |
| err = txn->get(tablet_stat_key, &tablet_stat_value); |
| if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| st.set_code(cast_as<ErrCategory::READ>(err)); |
| return st; |
| } |
| TabletStatsPB tablet_stat_check; |
| tablet_stat_check.ParseFromArray(tablet_stat_value.data(), tablet_stat_value.size()); |
| if (tablet_stat_check.DebugString() != tablet_stat_ptr->DebugString() && |
| // If anyone data size of tablet_stat_check and tablet_stat_ptr is twice bigger than another, |
| // we need to rewrite it this tablet_stat. |
| (tablet_stat_check.data_size() > 2 * tablet_stat_ptr->data_size() || |
| tablet_stat_ptr->data_size() > 2 * tablet_stat_check.data_size())) { |
| LOG_WARNING("[fix tablet stats]:tablet stats check failed") |
| .tag("tablet stat", tablet_stat_ptr->DebugString()) |
| .tag("check tabelt stat", tablet_stat_check.DebugString()); |
| } |
| |
| // check data size |
| std::string tablet_stat_data_size_key; |
| stats_tablet_data_size_key( |
| {instance_id, tablet_stat_ptr->idx().table_id(), tablet_stat_ptr->idx().index_id(), |
| tablet_stat_ptr->idx().partition_id(), tablet_stat_ptr->idx().tablet_id()}, |
| &tablet_stat_data_size_key); |
| int64_t tablet_stat_data_size = 0; |
| std::string tablet_stat_data_size_value(sizeof(tablet_stat_data_size), '\0'); |
| err = txn->get(tablet_stat_data_size_key, &tablet_stat_data_size_value); |
| if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| st.set_code(cast_as<ErrCategory::READ>(err)); |
| return st; |
| } |
| int64_t tablet_stat_data_size_check; |
| |
| if (tablet_stat_data_size_value.size() != sizeof(tablet_stat_data_size_check)) |
| [[unlikely]] { |
| LOG(WARNING) << " malformed tablet stats value v.size=" |
| << tablet_stat_data_size_value.size() |
| << " value=" << hex(tablet_stat_data_size_value); |
| } |
| std::memcpy(&tablet_stat_data_size_check, tablet_stat_data_size_value.data(), |
| sizeof(tablet_stat_data_size_check)); |
| if constexpr (std::endian::native == std::endian::big) { |
| tablet_stat_data_size_check = bswap_64(tablet_stat_data_size_check); |
| } |
| if (tablet_stat_data_size_check != tablet_stat_data_size && |
| // ditto |
| (tablet_stat_data_size_check > 2 * tablet_stat_data_size || |
| tablet_stat_data_size > 2 * tablet_stat_data_size_check)) { |
| LOG_WARNING("[fix tablet stats]:data size check failed") |
| .tag("data size", tablet_stat_data_size) |
| .tag("check data size", tablet_stat_data_size_check); |
| } |
| |
| // check index size |
| std::string tablet_stat_index_size_key; |
| stats_tablet_index_size_key( |
| {instance_id, tablet_stat_ptr->idx().table_id(), tablet_stat_ptr->idx().index_id(), |
| tablet_stat_ptr->idx().partition_id(), tablet_stat_ptr->idx().tablet_id()}, |
| &tablet_stat_index_size_key); |
| int64_t tablet_stat_index_size = 0; |
| std::string tablet_stat_index_size_value(sizeof(tablet_stat_index_size), '\0'); |
| err = txn->get(tablet_stat_index_size_key, &tablet_stat_index_size_value); |
| if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| st.set_code(cast_as<ErrCategory::READ>(err)); |
| return st; |
| } |
| int64_t tablet_stat_index_size_check; |
| |
| if (tablet_stat_index_size_value.size() != sizeof(tablet_stat_index_size_check)) |
| [[unlikely]] { |
| LOG(WARNING) << " malformed tablet stats value v.index_size=" |
| << tablet_stat_index_size_value.size() |
| << " value=" << hex(tablet_stat_index_size_value); |
| } |
| std::memcpy(&tablet_stat_index_size_check, tablet_stat_index_size_value.data(), |
| sizeof(tablet_stat_index_size_check)); |
| if constexpr (std::endian::native == std::endian::big) { |
| tablet_stat_index_size_check = bswap_64(tablet_stat_index_size_check); |
| } |
| if (tablet_stat_index_size_check != tablet_stat_index_size && |
| // ditto |
| (tablet_stat_index_size_check > 2 * tablet_stat_index_size || |
| tablet_stat_index_size > 2 * tablet_stat_index_size_check)) { |
| LOG_WARNING("[fix tablet stats]:index size check failed") |
| .tag("index size", tablet_stat_index_size) |
| .tag("check index size", tablet_stat_index_size_check); |
| } |
| |
| // check data size |
| std::string tablet_stat_segment_size_key; |
| stats_tablet_segment_size_key( |
| {instance_id, tablet_stat_ptr->idx().table_id(), tablet_stat_ptr->idx().index_id(), |
| tablet_stat_ptr->idx().partition_id(), tablet_stat_ptr->idx().tablet_id()}, |
| &tablet_stat_segment_size_key); |
| int64_t tablet_stat_segment_size = 0; |
| std::string tablet_stat_segment_size_value(sizeof(tablet_stat_segment_size), '\0'); |
| err = txn->get(tablet_stat_segment_size_key, &tablet_stat_segment_size_value); |
| if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| st.set_code(cast_as<ErrCategory::READ>(err)); |
| return st; |
| } |
| int64_t tablet_stat_segment_size_check; |
| |
| if (tablet_stat_segment_size_value.size() != sizeof(tablet_stat_segment_size_check)) |
| [[unlikely]] { |
| LOG(WARNING) << " malformed tablet stats value v.segment_size=" |
| << tablet_stat_segment_size_value.size() |
| << " value=" << hex(tablet_stat_segment_size_value); |
| } |
| std::memcpy(&tablet_stat_segment_size_check, tablet_stat_segment_size_value.data(), |
| sizeof(tablet_stat_segment_size_check)); |
| if constexpr (std::endian::native == std::endian::big) { |
| tablet_stat_segment_size_check = bswap_64(tablet_stat_segment_size_check); |
| } |
| if (tablet_stat_segment_size_check != tablet_stat_segment_size && |
| // ditto |
| (tablet_stat_segment_size_check > 2 * tablet_stat_segment_size || |
| tablet_stat_segment_size > 2 * tablet_stat_segment_size_check)) { |
| LOG_WARNING("[fix tablet stats]:segment size check failed") |
| .tag("segment size", tablet_stat_segment_size) |
| .tag("check segment size", tablet_stat_segment_size_check); |
| } |
| } |
| |
| return st; |
| } |
| |
| std::pair<MetaServiceCode, std::string> fix_versioned_tablet_stats_internal( |
| TxnKv* txn_kv, const std::string& instance_id, const TabletIndexPB& tablet_idx, |
| bool is_versioned_read, bool is_versioned_write, ResourceManager* resource_mgr) { |
| int64_t tablet_id = tablet_idx.tablet_id(); |
| std::unique_ptr<Transaction> txn; |
| MetaServiceCode code = MetaServiceCode::OK; |
| std::string msg; |
| |
| TxnErrorCode err = txn_kv->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| msg = "failed to create txn"; |
| return {code, msg}; |
| } |
| |
| TabletStatsPB original_tablet_stat; |
| TabletStatsPB existing_compact_stats; |
| TabletStatsPB existing_load_stats; |
| Versionstamp compact_versionstamp; |
| Versionstamp load_versionstamp; |
| GetRowsetResponse resp; |
| |
| CloneChainReader meta_reader(instance_id, resource_mgr); |
| if (is_versioned_read) { |
| // Get existing compact stats |
| err = meta_reader.get_tablet_compact_stats(txn.get(), tablet_id, &existing_compact_stats, |
| &compact_versionstamp, true); |
| if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| code = cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to get versioned compact stats, tablet_id={}, err={}", |
| tablet_id, err); |
| return {code, msg}; |
| } |
| |
| // Get existing load stats |
| err = meta_reader.get_tablet_load_stats(txn.get(), tablet_id, &existing_load_stats, |
| &load_versionstamp, true); |
| if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| code = cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to get versioned load stats, tablet_id={}, err={}", tablet_id, |
| err); |
| return {code, msg}; |
| } |
| MetaReader::merge_tablet_stats(existing_compact_stats, existing_load_stats, |
| &original_tablet_stat); |
| |
| std::vector<RowsetMetaCloudPB> rowset_metas; |
| int64_t start = 0, end = std::numeric_limits<int64_t>::max() - 1; |
| err = meta_reader.get_rowset_metas(txn.get(), tablet_id, start, end, &rowset_metas); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to get versioned rowset, err={}, tablet_id={}", err, |
| tablet_id); |
| return {code, msg}; |
| } |
| |
| std::move(rowset_metas.begin(), rowset_metas.end(), |
| google::protobuf::RepeatedPtrFieldBackInserter(resp.mutable_rowset_meta())); |
| } else { |
| internal_get_tablet_stats(code, msg, txn.get(), instance_id, tablet_idx, |
| original_tablet_stat, true); |
| if (code != MetaServiceCode::OK) { |
| return {code, msg}; |
| } |
| // get rowsets in tablet and accumulate disk size |
| internal_get_rowset(txn.get(), 0, std::numeric_limits<int64_t>::max() - 1, instance_id, |
| tablet_id, code, msg, &resp); |
| if (code != MetaServiceCode::OK) { |
| return {code, msg}; |
| } |
| } |
| |
| int64_t table_id = original_tablet_stat.idx().table_id(); |
| int64_t index_id = original_tablet_stat.idx().index_id(); |
| int64_t partition_id = original_tablet_stat.idx().partition_id(); |
| |
| int64_t total_disk_size = 0; |
| int64_t index_disk_size = 0; |
| int64_t data_disk_size = 0; |
| for (const auto& rs_meta : resp.rowset_meta()) { |
| total_disk_size += rs_meta.total_disk_size(); |
| index_disk_size += rs_meta.index_disk_size(); |
| data_disk_size += rs_meta.data_disk_size(); |
| } |
| |
| // set new disk size to tabletPB and write it back |
| TabletStatsPB tablet_stat; |
| tablet_stat.CopyFrom(original_tablet_stat); |
| tablet_stat.set_data_size(total_disk_size); |
| tablet_stat.set_index_size(index_disk_size); |
| tablet_stat.set_segment_size(data_disk_size); |
| |
| // Write single version stats |
| std::string tablet_stat_key; |
| std::string tablet_stat_value; |
| tablet_stat_key = stats_tablet_key({instance_id, table_id, index_id, partition_id, tablet_id}); |
| if (!tablet_stat.SerializeToString(&tablet_stat_value)) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| msg = "failed to serialize tablet stat"; |
| return {code, msg}; |
| } |
| txn->put(tablet_stat_key, tablet_stat_value); |
| |
| std::string num_segs_key = |
| stats_tablet_num_segs_key({instance_id, table_id, index_id, partition_id, tablet_id}); |
| std::string num_rows_key = |
| stats_tablet_num_rows_key({instance_id, table_id, index_id, partition_id, tablet_id}); |
| std::string num_rowsets_key = stats_tablet_num_rowsets_key( |
| {instance_id, table_id, index_id, partition_id, tablet_id}); |
| std::string data_size_key = |
| stats_tablet_data_size_key({instance_id, table_id, index_id, partition_id, tablet_id}); |
| std::string index_size_key = |
| stats_tablet_index_size_key({instance_id, table_id, index_id, partition_id, tablet_id}); |
| std::string segment_size_key = stats_tablet_segment_size_key( |
| {instance_id, table_id, index_id, partition_id, tablet_id}); |
| txn->remove(num_segs_key); |
| txn->remove(num_rows_key); |
| txn->remove(num_rowsets_key); |
| txn->remove(data_size_key); |
| txn->remove(index_size_key); |
| txn->remove(segment_size_key); |
| |
| if (is_versioned_write) { |
| // Write compact stats (aggregate stats with accurate disk sizes) |
| std::string compact_stats_key = |
| versioned::tablet_compact_stats_key({instance_id, tablet_id}); |
| TabletStatsPB compact_stats = tablet_stat; // Use the fixed stats with accurate disk sizes |
| versioned_put(txn.get(), compact_stats_key, compact_versionstamp, tablet_stat_value); |
| LOG(INFO) << "put versioned tablet compact stats key=" << hex(compact_stats_key) |
| << " tablet_id=" << tablet_id << " with existing versionstamp"; |
| |
| // Write load stats (detached stats, set to 0 since we recalculated from rowsets) |
| std::string load_stats_key = versioned::tablet_load_stats_key({instance_id, tablet_id}); |
| TabletStatsPB load_stats; |
| load_stats.mutable_idx()->CopyFrom(tablet_stat.idx()); |
| |
| std::string load_stats_value; |
| if (!load_stats.SerializeToString(&load_stats_value)) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| msg = "failed to serialize load stats"; |
| return {code, msg}; |
| } |
| |
| // Overwrite with existing versionstamp |
| versioned_put(txn.get(), load_stats_key, load_versionstamp, load_stats_value); |
| LOG(INFO) << "put versioned tablet load stats key=" << hex(load_stats_key) |
| << " tablet_id=" << tablet_id << " with existing versionstamp"; |
| } |
| |
| err = txn->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::COMMIT>(err); |
| msg = "failed to commit txn"; |
| return {code, msg}; |
| } |
| |
| return {MetaServiceCode::OK, ""}; |
| } |
| |
| } // namespace doris::cloud |