blob: 038c26741eaa3ee33e3a960638aac580e6ff7e50 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "cloud/cloud_meta_mgr.h"
#include <brpc/channel.h>
#include <brpc/controller.h>
#include <brpc/errno.pb.h>
#include <bthread/bthread.h>
#include <bthread/condition_variable.h>
#include <bthread/mutex.h>
#include <gen_cpp/PlanNodes_types.h>
#include <glog/logging.h>
#include <algorithm>
#include <atomic>
#include <chrono>
#include <cstdint>
#include <memory>
#include <mutex>
#include <random>
#include <shared_mutex>
#include <string>
#include <type_traits>
#include <vector>
#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet.h"
#include "cloud/cloud_warm_up_manager.h"
#include "cloud/config.h"
#include "cloud/delete_bitmap_file_reader.h"
#include "cloud/delete_bitmap_file_writer.h"
#include "cloud/pb_convert.h"
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "cpp/sync_point.h"
#include "gen_cpp/FrontendService.h"
#include "gen_cpp/HeartbeatService_types.h"
#include "gen_cpp/Types_types.h"
#include "gen_cpp/cloud.pb.h"
#include "gen_cpp/olap_file.pb.h"
#include "io/fs/obj_storage_client.h"
#include "olap/olap_common.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_factory.h"
#include "olap/rowset/rowset_fwd.h"
#include "olap/storage_engine.h"
#include "olap/tablet_meta.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "runtime/stream_load/stream_load_context.h"
#include "util/network_util.h"
#include "util/s3_util.h"
#include "util/thrift_rpc_helper.h"
namespace doris::cloud {
#include "common/compile_check_begin.h"
using namespace ErrorCode;
void* run_bthread_work(void* arg) {
auto* f = reinterpret_cast<std::function<void()>*>(arg);
(*f)();
delete f;
return nullptr;
}
Status bthread_fork_join(const std::vector<std::function<Status()>>& tasks, int concurrency) {
if (tasks.empty()) {
return Status::OK();
}
bthread::Mutex lock;
bthread::ConditionVariable cond;
Status status; // Guard by lock
int count = 0; // Guard by lock
for (const auto& task : tasks) {
{
std::unique_lock lk(lock);
// Wait until there are available slots
while (status.ok() && count >= concurrency) {
cond.wait(lk);
}
if (!status.ok()) {
break;
}
// Increase running task count
++count;
}
// dispatch task into bthreads
auto* fn = new std::function<void()>([&, &task = task] {
auto st = task();
{
std::lock_guard lk(lock);
--count;
if (!st.ok()) {
std::swap(st, status);
}
cond.notify_one();
}
});
bthread_t bthread_id;
if (bthread_start_background(&bthread_id, nullptr, run_bthread_work, fn) != 0) {
run_bthread_work(fn);
}
}
// Wait until all running tasks have done
{
std::unique_lock lk(lock);
while (count > 0) {
cond.wait(lk);
}
}
return status;
}
Status bthread_fork_join(std::vector<std::function<Status()>>&& tasks, int concurrency,
std::future<Status>* fut) {
// std::function will cause `copy`, we need to use heap memory to avoid copy ctor called
auto prom = std::make_shared<std::promise<Status>>();
*fut = prom->get_future();
std::function<void()>* fn = new std::function<void()>(
[tasks = std::move(tasks), concurrency, p = std::move(prom)]() mutable {
p->set_value(bthread_fork_join(tasks, concurrency));
});
bthread_t bthread_id;
if (bthread_start_background(&bthread_id, nullptr, run_bthread_work, fn) != 0) {
delete fn;
return Status::InternalError<false>("failed to create bthread");
}
return Status::OK();
}
namespace {
constexpr int kBrpcRetryTimes = 3;
bvar::LatencyRecorder _get_rowset_latency("doris_cloud_meta_mgr_get_rowset");
bvar::LatencyRecorder g_cloud_commit_txn_resp_redirect_latency("cloud_table_stats_report_latency");
bvar::Adder<uint64_t> g_cloud_meta_mgr_rpc_timeout_count("cloud_meta_mgr_rpc_timeout_count");
bvar::Window<bvar::Adder<uint64_t>> g_cloud_ms_rpc_timeout_count_window(
"cloud_meta_mgr_rpc_timeout_qps", &g_cloud_meta_mgr_rpc_timeout_count, 30);
bvar::LatencyRecorder g_cloud_be_mow_get_dbm_lock_backoff_sleep_time(
"cloud_be_mow_get_dbm_lock_backoff_sleep_time");
bvar::Adder<uint64_t> g_cloud_version_hole_filled_count("cloud_version_hole_filled_count");
class MetaServiceProxy {
public:
static Status get_proxy(MetaServiceProxy** proxy) {
// The 'stub' is a useless parameter, added only to reuse the `get_pooled_client` function.
std::shared_ptr<MetaService_Stub> stub;
return get_pooled_client(&stub, proxy);
}
void set_unhealthy() {
std::unique_lock lock(_mutex);
maybe_unhealthy = true;
}
bool need_reconn(long now) {
return maybe_unhealthy && ((now - last_reconn_time_ms.front()) >
config::meta_service_rpc_reconnect_interval_ms);
}
Status get(std::shared_ptr<MetaService_Stub>* stub) {
using namespace std::chrono;
auto now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
{
std::shared_lock lock(_mutex);
if (_deadline_ms >= now && !is_idle_timeout(now) && !need_reconn(now)) {
_last_access_at_ms.store(now, std::memory_order_relaxed);
*stub = _stub;
return Status::OK();
}
}
auto channel = std::make_unique<brpc::Channel>();
Status s = init_channel(channel.get());
if (!s.ok()) [[unlikely]] {
return s;
}
*stub = std::make_shared<MetaService_Stub>(channel.release(),
google::protobuf::Service::STUB_OWNS_CHANNEL);
long deadline = now;
// connection age only works without list endpoint.
if (config::meta_service_connection_age_base_seconds > 0) {
std::default_random_engine rng(static_cast<uint32_t>(now));
std::uniform_int_distribution<> uni(
config::meta_service_connection_age_base_seconds,
config::meta_service_connection_age_base_seconds * 2);
deadline = now + duration_cast<milliseconds>(seconds(uni(rng))).count();
}
// Last one WIN
std::unique_lock lock(_mutex);
_last_access_at_ms.store(now, std::memory_order_relaxed);
_deadline_ms = deadline;
_stub = *stub;
last_reconn_time_ms.push(now);
last_reconn_time_ms.pop();
maybe_unhealthy = false;
return Status::OK();
}
private:
static bool is_meta_service_endpoint_list() {
return config::meta_service_endpoint.find(',') != std::string::npos;
}
/**
* This function initializes a pool of `MetaServiceProxy` objects and selects one using
* round-robin. It returns a client stub via the selected proxy.
*
* @param stub A pointer to a shared pointer of `MetaService_Stub` to be retrieved.
* @param proxy (Optional) A pointer to store the selected `MetaServiceProxy`.
*
* @return Status Returns `Status::OK()` on success or an error status on failure.
*/
static Status get_pooled_client(std::shared_ptr<MetaService_Stub>* stub,
MetaServiceProxy** proxy) {
static std::once_flag proxies_flag;
static size_t num_proxies = 1;
static std::atomic<size_t> index(0);
static std::unique_ptr<MetaServiceProxy[]> proxies;
if (config::meta_service_endpoint.empty()) {
return Status::InvalidArgument(
"Meta service endpoint is empty. Please configure manually or wait for "
"heartbeat to obtain.");
}
std::call_once(
proxies_flag, +[]() {
if (config::meta_service_connection_pooled) {
num_proxies = config::meta_service_connection_pool_size;
}
proxies = std::make_unique<MetaServiceProxy[]>(num_proxies);
});
for (size_t i = 0; i + 1 < num_proxies; ++i) {
size_t next_index = index.fetch_add(1, std::memory_order_relaxed) % num_proxies;
Status s = proxies[next_index].get(stub);
if (proxy != nullptr) {
*proxy = &(proxies[next_index]);
}
if (s.ok()) return Status::OK();
}
size_t next_index = index.fetch_add(1, std::memory_order_relaxed) % num_proxies;
if (proxy != nullptr) {
*proxy = &(proxies[next_index]);
}
return proxies[next_index].get(stub);
}
static Status init_channel(brpc::Channel* channel) {
static std::atomic<size_t> index = 1;
const char* load_balancer_name = nullptr;
std::string endpoint;
if (is_meta_service_endpoint_list()) {
endpoint = fmt::format("list://{}", config::meta_service_endpoint);
load_balancer_name = "random";
} else {
std::string ip;
uint16_t port;
Status s = get_meta_service_ip_and_port(&ip, &port);
if (!s.ok()) {
LOG(WARNING) << "fail to get meta service ip and port: " << s;
return s;
}
endpoint = get_host_port(ip, port);
}
brpc::ChannelOptions options;
options.connection_group =
fmt::format("ms_{}", index.fetch_add(1, std::memory_order_relaxed));
if (channel->Init(endpoint.c_str(), load_balancer_name, &options) != 0) {
return Status::InvalidArgument("failed to init brpc channel, endpoint: {}", endpoint);
}
return Status::OK();
}
static Status get_meta_service_ip_and_port(std::string* ip, uint16_t* port) {
std::string parsed_host;
if (!parse_endpoint(config::meta_service_endpoint, &parsed_host, port)) {
return Status::InvalidArgument("invalid meta service endpoint: {}",
config::meta_service_endpoint);
}
if (is_valid_ip(parsed_host)) {
*ip = std::move(parsed_host);
return Status::OK();
}
return hostname_to_ip(parsed_host, *ip);
}
bool is_idle_timeout(long now) {
auto idle_timeout_ms = config::meta_service_idle_connection_timeout_ms;
// idle timeout only works without list endpoint.
return !is_meta_service_endpoint_list() && idle_timeout_ms > 0 &&
_last_access_at_ms.load(std::memory_order_relaxed) + idle_timeout_ms < now;
}
std::shared_mutex _mutex;
std::atomic<long> _last_access_at_ms {0};
long _deadline_ms {0};
std::shared_ptr<MetaService_Stub> _stub;
std::queue<long> last_reconn_time_ms {std::deque<long> {0, 0, 0}};
bool maybe_unhealthy = false;
};
template <typename T, typename... Ts>
struct is_any : std::disjunction<std::is_same<T, Ts>...> {};
template <typename T, typename... Ts>
constexpr bool is_any_v = is_any<T, Ts...>::value;
template <typename Request>
static std::string debug_info(const Request& req) {
if constexpr (is_any_v<Request, CommitTxnRequest, AbortTxnRequest, PrecommitTxnRequest>) {
return fmt::format(" txn_id={}", req.txn_id());
} else if constexpr (is_any_v<Request, StartTabletJobRequest, FinishTabletJobRequest>) {
return fmt::format(" tablet_id={}", req.job().idx().tablet_id());
} else if constexpr (is_any_v<Request, UpdateDeleteBitmapRequest>) {
return fmt::format(" tablet_id={}, lock_id={}", req.tablet_id(), req.lock_id());
} else if constexpr (is_any_v<Request, GetDeleteBitmapUpdateLockRequest>) {
return fmt::format(" table_id={}, lock_id={}", req.table_id(), req.lock_id());
} else if constexpr (is_any_v<Request, GetTabletRequest>) {
return fmt::format(" tablet_id={}", req.tablet_id());
} else if constexpr (is_any_v<Request, GetObjStoreInfoRequest, ListSnapshotRequest,
GetInstanceRequest>) {
return "";
} else if constexpr (is_any_v<Request, CreateRowsetRequest>) {
return fmt::format(" tablet_id={}", req.rowset_meta().tablet_id());
} else if constexpr (is_any_v<Request, RemoveDeleteBitmapRequest>) {
return fmt::format(" tablet_id={}", req.tablet_id());
} else if constexpr (is_any_v<Request, RemoveDeleteBitmapUpdateLockRequest>) {
return fmt::format(" table_id={}, tablet_id={}, lock_id={}", req.table_id(),
req.tablet_id(), req.lock_id());
} else if constexpr (is_any_v<Request, GetDeleteBitmapRequest>) {
return fmt::format(" tablet_id={}", req.tablet_id());
} else if constexpr (is_any_v<Request, GetSchemaDictRequest>) {
return fmt::format(" index_id={}", req.index_id());
} else if constexpr (is_any_v<Request, RestoreJobRequest>) {
return fmt::format(" tablet_id={}", req.tablet_id());
} else if constexpr (is_any_v<Request, UpdatePackedFileInfoRequest>) {
return fmt::format(" packed_file_path={}", req.packed_file_path());
} else {
static_assert(!sizeof(Request));
}
}
inline std::default_random_engine make_random_engine() {
return std::default_random_engine(
static_cast<uint32_t>(std::chrono::steady_clock::now().time_since_epoch().count()));
}
template <typename Request, typename Response>
using MetaServiceMethod = void (MetaService_Stub::*)(::google::protobuf::RpcController*,
const Request*, Response*,
::google::protobuf::Closure*);
template <typename Request, typename Response>
Status retry_rpc(std::string_view op_name, const Request& req, Response* res,
MetaServiceMethod<Request, Response> method) {
static_assert(std::is_base_of_v<::google::protobuf::Message, Request>);
static_assert(std::is_base_of_v<::google::protobuf::Message, Response>);
// Applies only to the current file, and all req are non-const, but passed as const types.
const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint());
int retry_times = 0;
uint32_t duration_ms = 0;
std::string error_msg;
std::default_random_engine rng = make_random_engine();
std::uniform_int_distribution<uint32_t> u(20, 200);
std::uniform_int_distribution<uint32_t> u2(500, 1000);
MetaServiceProxy* proxy;
RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
while (true) {
std::shared_ptr<MetaService_Stub> stub;
RETURN_IF_ERROR(proxy->get(&stub));
brpc::Controller cntl;
if (op_name == "get delete bitmap" || op_name == "update delete bitmap") {
cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
} else {
cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
}
cntl.set_max_retry(kBrpcRetryTimes);
res->Clear();
int error_code = 0;
(stub.get()->*method)(&cntl, &req, res, nullptr);
if (cntl.Failed()) [[unlikely]] {
error_msg = cntl.ErrorText();
error_code = cntl.ErrorCode();
proxy->set_unhealthy();
} else if (res->status().code() == MetaServiceCode::OK) {
return Status::OK();
} else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) {
return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name,
res->status().msg());
} else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) {
return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name,
res->status().msg());
} else {
error_msg = res->status().msg();
}
if (error_code == brpc::ERPCTIMEDOUT) {
g_cloud_meta_mgr_rpc_timeout_count << 1;
}
++retry_times;
if (retry_times > config::meta_service_rpc_retry_times ||
(retry_times > config::meta_service_rpc_timeout_retry_times &&
error_code == brpc::ERPCTIMEDOUT) ||
(retry_times > config::meta_service_conflict_error_retry_times &&
res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) {
break;
}
duration_ms = retry_times <= 100 ? u(rng) : u2(rng);
LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times
<< " sleep=" << duration_ms << "ms : " << cntl.ErrorText();
bthread_usleep(duration_ms * 1000);
}
return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg);
}
} // namespace
Status CloudMetaMgr::get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tablet_meta) {
VLOG_DEBUG << "send GetTabletRequest, tablet_id: " << tablet_id;
TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::get_tablet_meta", Status::OK(), tablet_id,
tablet_meta);
GetTabletRequest req;
GetTabletResponse resp;
req.set_cloud_unique_id(config::cloud_unique_id);
req.set_tablet_id(tablet_id);
Status st = retry_rpc("get tablet meta", req, &resp, &MetaService_Stub::get_tablet);
if (!st.ok()) {
if (resp.status().code() == MetaServiceCode::TABLET_NOT_FOUND) {
return Status::NotFound("failed to get tablet meta: {}", resp.status().msg());
}
return st;
}
*tablet_meta = std::make_shared<TabletMeta>();
(*tablet_meta)
->init_from_pb(cloud_tablet_meta_to_doris(std::move(*resp.mutable_tablet_meta())));
VLOG_DEBUG << "get tablet meta, tablet_id: " << (*tablet_meta)->tablet_id();
return Status::OK();
}
Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, const SyncOptions& options,
SyncRowsetStats* sync_stats) {
std::unique_lock lock {tablet->get_sync_meta_lock()};
return sync_tablet_rowsets_unlocked(tablet, lock, options, sync_stats);
}
Status CloudMetaMgr::_log_mow_delete_bitmap(CloudTablet* tablet, GetRowsetResponse& resp,
DeleteBitmap& delete_bitmap, int64_t old_max_version,
bool full_sync, int32_t read_version) {
if (config::enable_mow_verbose_log && !resp.rowset_meta().empty() &&
delete_bitmap.cardinality() > 0) {
int64_t tablet_id = tablet->tablet_id();
std::vector<std::string> new_rowset_msgs;
std::vector<std::string> old_rowset_msgs;
std::unordered_set<RowsetId> new_rowset_ids;
int64_t new_max_version = resp.rowset_meta().rbegin()->end_version();
for (const auto& rs : resp.rowset_meta()) {
RowsetId rowset_id;
rowset_id.init(rs.rowset_id_v2());
new_rowset_ids.insert(rowset_id);
DeleteBitmap rowset_dbm(tablet_id);
delete_bitmap.subset({rowset_id, 0, 0},
{rowset_id, std::numeric_limits<DeleteBitmap::SegmentId>::max(),
std::numeric_limits<DeleteBitmap::Version>::max()},
&rowset_dbm);
size_t cardinality = rowset_dbm.cardinality();
size_t count = rowset_dbm.get_delete_bitmap_count();
if (cardinality > 0) {
new_rowset_msgs.push_back(fmt::format("({}[{}-{}],{},{})", rs.rowset_id_v2(),
rs.start_version(), rs.end_version(), count,
cardinality));
}
}
if (old_max_version > 0) {
std::vector<RowsetSharedPtr> old_rowsets;
RowsetIdUnorderedSet old_rowset_ids;
{
std::lock_guard<std::shared_mutex> rlock(tablet->get_header_lock());
RETURN_IF_ERROR(tablet->get_all_rs_id_unlocked(old_max_version, &old_rowset_ids));
old_rowsets = tablet->get_rowset_by_ids(&old_rowset_ids);
}
for (const auto& rs : old_rowsets) {
if (!new_rowset_ids.contains(rs->rowset_id())) {
DeleteBitmap rowset_dbm(tablet_id);
delete_bitmap.subset(
{rs->rowset_id(), 0, 0},
{rs->rowset_id(), std::numeric_limits<DeleteBitmap::SegmentId>::max(),
std::numeric_limits<DeleteBitmap::Version>::max()},
&rowset_dbm);
size_t cardinality = rowset_dbm.cardinality();
size_t count = rowset_dbm.get_delete_bitmap_count();
if (cardinality > 0) {
old_rowset_msgs.push_back(
fmt::format("({}{},{},{})", rs->rowset_id().to_string(),
rs->version().to_string(), count, cardinality));
}
}
}
}
std::string tablet_info = fmt::format(
"tablet_id={} table_id={} index_id={} partition_id={}", tablet->tablet_id(),
tablet->table_id(), tablet->index_id(), tablet->partition_id());
LOG_INFO("[verbose] sync tablet delete bitmap " + tablet_info)
.tag("full_sync", full_sync)
.tag("read_version", read_version)
.tag("old_max_version", old_max_version)
.tag("new_max_version", new_max_version)
.tag("cumu_compaction_cnt", resp.stats().cumulative_compaction_cnt())
.tag("base_compaction_cnt", resp.stats().base_compaction_cnt())
.tag("cumu_point", resp.stats().cumulative_point())
.tag("rowset_num", resp.rowset_meta().size())
.tag("delete_bitmap_cardinality", delete_bitmap.cardinality())
.tag("old_rowsets(rowset,count,cardinality)",
fmt::format("[{}]", fmt::join(old_rowset_msgs, ", ")))
.tag("new_rowsets(rowset,count,cardinality)",
fmt::format("[{}]", fmt::join(new_rowset_msgs, ", ")));
}
return Status::OK();
}
Status CloudMetaMgr::sync_tablet_rowsets_unlocked(CloudTablet* tablet,
std::unique_lock<bthread::Mutex>& lock,
const SyncOptions& options,
SyncRowsetStats* sync_stats) {
using namespace std::chrono;
TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::sync_tablet_rowsets", Status::OK(), tablet);
MetaServiceProxy* proxy;
RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
std::string tablet_info =
fmt::format("tablet_id={} table_id={} index_id={} partition_id={}", tablet->tablet_id(),
tablet->table_id(), tablet->index_id(), tablet->partition_id());
int tried = 0;
while (true) {
std::shared_ptr<MetaService_Stub> stub;
RETURN_IF_ERROR(proxy->get(&stub));
brpc::Controller cntl;
cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
GetRowsetRequest req;
GetRowsetResponse resp;
int64_t tablet_id = tablet->tablet_id();
int64_t table_id = tablet->table_id();
int64_t index_id = tablet->index_id();
req.set_cloud_unique_id(config::cloud_unique_id);
auto* idx = req.mutable_idx();
idx->set_tablet_id(tablet_id);
idx->set_table_id(table_id);
idx->set_index_id(index_id);
idx->set_partition_id(tablet->partition_id());
{
std::shared_lock rlock(tablet->get_header_lock());
if (options.full_sync) {
req.set_start_version(0);
} else {
req.set_start_version(tablet->max_version_unlocked() + 1);
}
req.set_base_compaction_cnt(tablet->base_compaction_cnt());
req.set_cumulative_compaction_cnt(tablet->cumulative_compaction_cnt());
req.set_full_compaction_cnt(tablet->full_compaction_cnt());
req.set_cumulative_point(tablet->cumulative_layer_point());
}
req.set_end_version(-1);
VLOG_DEBUG << "send GetRowsetRequest: " << req.ShortDebugString();
auto start = std::chrono::steady_clock::now();
stub->get_rowset(&cntl, &req, &resp, nullptr);
auto end = std::chrono::steady_clock::now();
int64_t latency = cntl.latency_us();
_get_rowset_latency << latency;
int retry_times = config::meta_service_rpc_retry_times;
if (cntl.Failed()) {
proxy->set_unhealthy();
if (tried++ < retry_times) {
auto rng = make_random_engine();
std::uniform_int_distribution<uint32_t> u(20, 200);
std::uniform_int_distribution<uint32_t> u1(500, 1000);
uint32_t duration_ms = tried >= 100 ? u(rng) : u1(rng);
bthread_usleep(duration_ms * 1000);
LOG_INFO("failed to get rowset meta, " + tablet_info)
.tag("reason", cntl.ErrorText())
.tag("tried", tried)
.tag("sleep", duration_ms);
continue;
}
return Status::RpcError("failed to get rowset meta: {}", cntl.ErrorText());
}
if (resp.status().code() == MetaServiceCode::TABLET_NOT_FOUND) {
LOG(WARNING) << "failed to get rowset meta, err=" << resp.status().msg() << " "
<< tablet_info;
return Status::NotFound("failed to get rowset meta: {}, {}", resp.status().msg(),
tablet_info);
}
if (resp.status().code() != MetaServiceCode::OK) {
LOG(WARNING) << " failed to get rowset meta, err=" << resp.status().msg() << " "
<< tablet_info;
return Status::InternalError("failed to get rowset meta: {}, {}", resp.status().msg(),
tablet_info);
}
if (latency > 100 * 1000) { // 100ms
LOG(INFO) << "finish get_rowset rpc. rowset_meta.size()=" << resp.rowset_meta().size()
<< ", latency=" << latency << "us"
<< " " << tablet_info;
} else {
LOG_EVERY_N(INFO, 100)
<< "finish get_rowset rpc. rowset_meta.size()=" << resp.rowset_meta().size()
<< ", latency=" << latency << "us"
<< " " << tablet_info;
}
int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
tablet->last_sync_time_s = now;
if (sync_stats) {
sync_stats->get_remote_rowsets_rpc_ns +=
std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();
sync_stats->get_remote_rowsets_num += resp.rowset_meta().size();
}
// If is mow, the tablet has no delete bitmap in base rowsets.
// So dont need to sync it.
if (options.sync_delete_bitmap && tablet->enable_unique_key_merge_on_write() &&
tablet->tablet_state() == TABLET_RUNNING) {
DBUG_EXECUTE_IF("CloudMetaMgr::sync_tablet_rowsets.sync_tablet_delete_bitmap.block",
DBUG_BLOCK);
DeleteBitmap delete_bitmap(tablet_id);
int64_t old_max_version = req.start_version() - 1;
auto read_version = config::delete_bitmap_store_read_version;
auto st = sync_tablet_delete_bitmap(tablet, old_max_version, resp.rowset_meta(),
resp.stats(), req.idx(), &delete_bitmap,
options.full_sync, sync_stats, read_version, false);
if (st.is<ErrorCode::ROWSETS_EXPIRED>() && tried++ < retry_times) {
LOG_INFO("rowset meta is expired, need to retry, " + tablet_info)
.tag("tried", tried)
.error(st);
continue;
}
if (!st.ok()) {
LOG_WARNING("failed to get delete bitmap, " + tablet_info).error(st);
return st;
}
tablet->tablet_meta()->delete_bitmap().merge(delete_bitmap);
RETURN_IF_ERROR(_log_mow_delete_bitmap(tablet, resp, delete_bitmap, old_max_version,
options.full_sync, read_version));
RETURN_IF_ERROR(
_check_delete_bitmap_v2_correctness(tablet, req, resp, old_max_version));
}
DBUG_EXECUTE_IF("CloudMetaMgr::sync_tablet_rowsets.before.modify_tablet_meta", {
auto target_tablet_id = dp->param<int64_t>("tablet_id", -1);
if (target_tablet_id == tablet->tablet_id()) {
DBUG_BLOCK
}
});
{
const auto& stats = resp.stats();
std::unique_lock wlock(tablet->get_header_lock());
// ATTN: we are facing following data race
//
// resp_base_compaction_cnt=0|base_compaction_cnt=0|resp_cumulative_compaction_cnt=0|cumulative_compaction_cnt=1|resp_max_version=11|max_version=8
//
// BE-compaction-thread meta-service BE-query-thread
// | | |
// local | commit cumu-compaction | |
// cc_cnt=0 | ---------------------------> | sync rowset (long rpc, local cc_cnt=0 ) | local
// | | <----------------------------------------- | cc_cnt=0
// | | -. |
// local | done cc_cnt=1 | \ |
// cc_cnt=1 | <--------------------------- | \ |
// | | \ returned with resp cc_cnt=0 (snapshot) |
// | | '------------------------------------> | local
// | | | cc_cnt=1
// | | |
// | | | CHECK FAIL
// | | | need retry
// To get rid of just retry syncing tablet
if (stats.base_compaction_cnt() < tablet->base_compaction_cnt() ||
stats.cumulative_compaction_cnt() < tablet->cumulative_compaction_cnt())
[[unlikely]] {
// stale request, ignore
LOG_WARNING("stale get rowset meta request " + tablet_info)
.tag("resp_base_compaction_cnt", stats.base_compaction_cnt())
.tag("base_compaction_cnt", tablet->base_compaction_cnt())
.tag("resp_cumulative_compaction_cnt", stats.cumulative_compaction_cnt())
.tag("cumulative_compaction_cnt", tablet->cumulative_compaction_cnt())
.tag("tried", tried);
if (tried++ < 10) continue;
return Status::OK();
}
std::vector<RowsetSharedPtr> rowsets;
rowsets.reserve(resp.rowset_meta().size());
for (const auto& cloud_rs_meta_pb : resp.rowset_meta()) {
VLOG_DEBUG << "get rowset meta, tablet_id=" << cloud_rs_meta_pb.tablet_id()
<< ", version=[" << cloud_rs_meta_pb.start_version() << '-'
<< cloud_rs_meta_pb.end_version() << ']';
auto existed_rowset = tablet->get_rowset_by_version(
{cloud_rs_meta_pb.start_version(), cloud_rs_meta_pb.end_version()});
if (existed_rowset &&
existed_rowset->rowset_id().to_string() == cloud_rs_meta_pb.rowset_id_v2()) {
continue; // Same rowset, skip it
}
RowsetMetaPB meta_pb = cloud_rowset_meta_to_doris(cloud_rs_meta_pb);
auto rs_meta = std::make_shared<RowsetMeta>();
rs_meta->init_from_pb(meta_pb);
RowsetSharedPtr rowset;
// schema is nullptr implies using RowsetMeta.tablet_schema
Status s = RowsetFactory::create_rowset(nullptr, "", rs_meta, &rowset);
if (!s.ok()) {
LOG_WARNING("create rowset").tag("status", s);
return s;
}
rowsets.push_back(std::move(rowset));
}
if (!rowsets.empty()) {
// `rowsets.empty()` could happen after doing EMPTY_CUMULATIVE compaction. e.g.:
// BE has [0-1][2-11][12-12], [12-12] is delete predicate, cp is 2;
// after doing EMPTY_CUMULATIVE compaction, MS cp is 13, get_rowset will return [2-11][12-12].
bool version_overlap =
tablet->max_version_unlocked() >= rowsets.front()->start_version();
tablet->add_rowsets(std::move(rowsets), version_overlap, wlock,
options.warmup_delta_data ||
config::enable_warmup_immediately_on_new_rowset);
}
// Fill version holes
int64_t partition_max_version =
resp.has_partition_max_version() ? resp.partition_max_version() : -1;
RETURN_IF_ERROR(fill_version_holes(tablet, partition_max_version, wlock));
tablet->last_base_compaction_success_time_ms = stats.last_base_compaction_time_ms();
tablet->last_cumu_compaction_success_time_ms = stats.last_cumu_compaction_time_ms();
tablet->set_base_compaction_cnt(stats.base_compaction_cnt());
tablet->set_cumulative_compaction_cnt(stats.cumulative_compaction_cnt());
tablet->set_full_compaction_cnt(stats.full_compaction_cnt());
tablet->set_cumulative_layer_point(stats.cumulative_point());
tablet->reset_approximate_stats(stats.num_rowsets(), stats.num_segments(),
stats.num_rows(), stats.data_size());
}
return Status::OK();
}
}
bool CloudMetaMgr::sync_tablet_delete_bitmap_by_cache(CloudTablet* tablet, int64_t old_max_version,
std::ranges::range auto&& rs_metas,
DeleteBitmap* delete_bitmap) {
std::set<int64_t> txn_processed;
for (auto& rs_meta : rs_metas) {
auto txn_id = rs_meta.txn_id();
if (txn_processed.find(txn_id) != txn_processed.end()) {
continue;
}
txn_processed.insert(txn_id);
DeleteBitmapPtr tmp_delete_bitmap;
std::shared_ptr<PublishStatus> publish_status =
std::make_shared<PublishStatus>(PublishStatus::INIT);
CloudStorageEngine& engine = ExecEnv::GetInstance()->storage_engine().to_cloud();
Status status = engine.txn_delete_bitmap_cache().get_delete_bitmap(
txn_id, tablet->tablet_id(), &tmp_delete_bitmap, nullptr, &publish_status);
// CloudMetaMgr::sync_tablet_delete_bitmap_by_cache() is called after we sync rowsets from meta services.
// If the control flows reaches here, it's gauranteed that the rowsets is commited in meta services, so we can
// use the delete bitmap from cache directly if *publish_status == PublishStatus::SUCCEED without checking other
// stats(version or compaction stats)
if (status.ok() && *publish_status == PublishStatus::SUCCEED) {
// tmp_delete_bitmap contains sentinel marks, we should remove it before merge it to delete bitmap.
// Also, the version of delete bitmap key in tmp_delete_bitmap is DeleteBitmap::TEMP_VERSION_COMMON,
// we should replace it with the rowset's real version
DCHECK(rs_meta.start_version() == rs_meta.end_version());
int64_t rowset_version = rs_meta.start_version();
for (const auto& [delete_bitmap_key, bitmap_value] : tmp_delete_bitmap->delete_bitmap) {
// skip sentinel mark, which is used for delete bitmap correctness check
if (std::get<1>(delete_bitmap_key) != DeleteBitmap::INVALID_SEGMENT_ID) {
delete_bitmap->merge({std::get<0>(delete_bitmap_key),
std::get<1>(delete_bitmap_key), rowset_version},
bitmap_value);
}
}
engine.txn_delete_bitmap_cache().remove_unused_tablet_txn_info(txn_id,
tablet->tablet_id());
} else {
LOG_EVERY_N(INFO, 20)
<< "delete bitmap not found in cache, will sync rowset to get. tablet_id= "
<< tablet->tablet_id() << ", txn_id=" << txn_id << ", status=" << status;
return false;
}
}
return true;
}
Status CloudMetaMgr::_get_delete_bitmap_from_ms(GetDeleteBitmapRequest& req,
GetDeleteBitmapResponse& res) {
VLOG_DEBUG << "send GetDeleteBitmapRequest: " << req.ShortDebugString();
TEST_SYNC_POINT_CALLBACK("CloudMetaMgr::_get_delete_bitmap_from_ms", &req, &res);
auto st = retry_rpc("get delete bitmap", req, &res, &MetaService_Stub::get_delete_bitmap);
if (st.code() == ErrorCode::THRIFT_RPC_ERROR) {
return st;
}
if (res.status().code() == MetaServiceCode::TABLET_NOT_FOUND) {
return Status::NotFound("failed to get delete bitmap: {}", res.status().msg());
}
// The delete bitmap of stale rowsets will be removed when commit compaction job,
// then delete bitmap of stale rowsets cannot be obtained. But the rowsets obtained
// by sync_tablet_rowsets may include these stale rowsets. When this case happend, the
// error code of ROWSETS_EXPIRED will be returned, we need to retry sync rowsets again.
//
// Be query thread meta-service Be compaction thread
// | | |
// | get rowset | |
// |--------------------------->| |
// | return get rowset | |
// |<---------------------------| |
// | | commit job |
// | |<------------------------|
// | | return commit job |
// | |------------------------>|
// | get delete bitmap | |
// |--------------------------->| |
// | return get delete bitmap | |
// |<---------------------------| |
// | | |
if (res.status().code() == MetaServiceCode::ROWSETS_EXPIRED) {
return Status::Error<ErrorCode::ROWSETS_EXPIRED, false>("failed to get delete bitmap: {}",
res.status().msg());
}
if (res.status().code() != MetaServiceCode::OK) {
return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to get delete bitmap: {}",
res.status().msg());
}
return Status::OK();
}
Status CloudMetaMgr::_get_delete_bitmap_from_ms_by_batch(GetDeleteBitmapRequest& req,
GetDeleteBitmapResponse& res,
int64_t bytes_threadhold) {
std::unordered_set<std::string> finished_rowset_ids {};
int count = 0;
do {
GetDeleteBitmapRequest cur_req;
GetDeleteBitmapResponse cur_res;
cur_req.set_cloud_unique_id(config::cloud_unique_id);
cur_req.set_tablet_id(req.tablet_id());
cur_req.set_base_compaction_cnt(req.base_compaction_cnt());
cur_req.set_cumulative_compaction_cnt(req.cumulative_compaction_cnt());
cur_req.set_cumulative_point(req.cumulative_point());
*(cur_req.mutable_idx()) = req.idx();
cur_req.set_store_version(req.store_version());
if (bytes_threadhold > 0) {
cur_req.set_dbm_bytes_threshold(bytes_threadhold);
}
for (int i = 0; i < req.rowset_ids_size(); i++) {
if (!finished_rowset_ids.contains(req.rowset_ids(i))) {
cur_req.add_rowset_ids(req.rowset_ids(i));
cur_req.add_begin_versions(req.begin_versions(i));
cur_req.add_end_versions(req.end_versions(i));
}
}
RETURN_IF_ERROR(_get_delete_bitmap_from_ms(cur_req, cur_res));
++count;
// v1 delete bitmap
res.mutable_rowset_ids()->MergeFrom(cur_res.rowset_ids());
res.mutable_segment_ids()->MergeFrom(cur_res.segment_ids());
res.mutable_versions()->MergeFrom(cur_res.versions());
res.mutable_segment_delete_bitmaps()->MergeFrom(cur_res.segment_delete_bitmaps());
// v2 delete bitmap
res.mutable_delta_rowset_ids()->MergeFrom(cur_res.delta_rowset_ids());
res.mutable_delete_bitmap_storages()->MergeFrom(cur_res.delete_bitmap_storages());
for (const auto& rowset_id : cur_res.returned_rowset_ids()) {
finished_rowset_ids.insert(rowset_id);
}
bool has_more = cur_res.has_has_more() && cur_res.has_more();
if (!has_more) {
break;
}
LOG_INFO("batch get delete bitmap, progress={}/{}", finished_rowset_ids.size(),
req.rowset_ids_size())
.tag("tablet_id", req.tablet_id())
.tag("cur_returned_rowsets", cur_res.returned_rowset_ids_size())
.tag("rpc_count", count);
} while (finished_rowset_ids.size() < req.rowset_ids_size());
return Status::OK();
}
Status CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_max_version,
std::ranges::range auto&& rs_metas,
const TabletStatsPB& stats, const TabletIndexPB& idx,
DeleteBitmap* delete_bitmap, bool full_sync,
SyncRowsetStats* sync_stats, int32_t read_version,
bool full_sync_v2) {
if (rs_metas.empty()) {
return Status::OK();
}
if (!full_sync && config::enable_sync_tablet_delete_bitmap_by_cache &&
sync_tablet_delete_bitmap_by_cache(tablet, old_max_version, rs_metas, delete_bitmap)) {
if (sync_stats) {
sync_stats->get_local_delete_bitmap_rowsets_num += rs_metas.size();
}
return Status::OK();
} else {
DeleteBitmapPtr new_delete_bitmap = std::make_shared<DeleteBitmap>(tablet->tablet_id());
*delete_bitmap = *new_delete_bitmap;
}
if (read_version == 2 && config::delete_bitmap_store_write_version == 1) {
return Status::InternalError(
"please set delete_bitmap_store_read_version to 1 or 3 because "
"delete_bitmap_store_write_version is 1");
} else if (read_version == 1 && config::delete_bitmap_store_write_version == 2) {
return Status::InternalError(
"please set delete_bitmap_store_read_version to 2 or 3 because "
"delete_bitmap_store_write_version is 2");
}
int64_t new_max_version = std::max(old_max_version, rs_metas.rbegin()->end_version());
// When there are many delete bitmaps that need to be synchronized, it
// may take a longer time, especially when loading the tablet for the
// first time, so set a relatively long timeout time.
GetDeleteBitmapRequest req;
GetDeleteBitmapResponse res;
req.set_cloud_unique_id(config::cloud_unique_id);
req.set_tablet_id(tablet->tablet_id());
req.set_base_compaction_cnt(stats.base_compaction_cnt());
req.set_cumulative_compaction_cnt(stats.cumulative_compaction_cnt());
req.set_cumulative_point(stats.cumulative_point());
*(req.mutable_idx()) = idx;
req.set_store_version(read_version);
// New rowset sync all versions of delete bitmap
for (const auto& rs_meta : rs_metas) {
req.add_rowset_ids(rs_meta.rowset_id_v2());
req.add_begin_versions(0);
req.add_end_versions(new_max_version);
}
if (!full_sync_v2) {
// old rowset sync incremental versions of delete bitmap
if (old_max_version > 0 && old_max_version < new_max_version) {
RowsetIdUnorderedSet all_rs_ids;
RETURN_IF_ERROR(tablet->get_all_rs_id(old_max_version, &all_rs_ids));
for (const auto& rs_id : all_rs_ids) {
req.add_rowset_ids(rs_id.to_string());
req.add_begin_versions(old_max_version + 1);
req.add_end_versions(new_max_version);
}
}
} else {
if (old_max_version > 0) {
RowsetIdUnorderedSet all_rs_ids;
RETURN_IF_ERROR(tablet->get_all_rs_id(old_max_version, &all_rs_ids));
for (const auto& rs_id : all_rs_ids) {
req.add_rowset_ids(rs_id.to_string());
req.add_begin_versions(0);
req.add_end_versions(new_max_version);
}
}
}
if (sync_stats) {
sync_stats->get_remote_delete_bitmap_rowsets_num += req.rowset_ids_size();
}
auto start = std::chrono::steady_clock::now();
if (config::enable_batch_get_delete_bitmap) {
RETURN_IF_ERROR(_get_delete_bitmap_from_ms_by_batch(
req, res, config::get_delete_bitmap_bytes_threshold));
} else {
RETURN_IF_ERROR(_get_delete_bitmap_from_ms(req, res));
}
auto end = std::chrono::steady_clock::now();
// v1 delete bitmap
const auto& rowset_ids = res.rowset_ids();
const auto& segment_ids = res.segment_ids();
const auto& vers = res.versions();
const auto& delete_bitmaps = res.segment_delete_bitmaps();
if (rowset_ids.size() != segment_ids.size() || rowset_ids.size() != vers.size() ||
rowset_ids.size() != delete_bitmaps.size()) {
return Status::Error<ErrorCode::INTERNAL_ERROR, false>(
"get delete bitmap data wrong,"
"rowset_ids.size={},segment_ids.size={},vers.size={},delete_bitmaps.size={}",
rowset_ids.size(), segment_ids.size(), vers.size(), delete_bitmaps.size());
}
for (int i = 0; i < rowset_ids.size(); i++) {
RowsetId rst_id;
rst_id.init(rowset_ids[i]);
delete_bitmap->merge(
{rst_id, segment_ids[i], vers[i]},
roaring::Roaring::readSafe(delete_bitmaps[i].data(), delete_bitmaps[i].length()));
}
// v2 delete bitmap
const auto& delta_rowset_ids = res.delta_rowset_ids();
const auto& delete_bitmap_storages = res.delete_bitmap_storages();
if (delta_rowset_ids.size() != delete_bitmap_storages.size()) {
return Status::Error<ErrorCode::INTERNAL_ERROR, false>(
"get delete bitmap data wrong, delta_rowset_ids.size={}, "
"delete_bitmap_storages.size={}",
delta_rowset_ids.size(), delete_bitmap_storages.size());
}
int64_t remote_delete_bitmap_bytes = 0;
RETURN_IF_ERROR(_read_tablet_delete_bitmap_v2(tablet, old_max_version, rs_metas, delete_bitmap,
res, remote_delete_bitmap_bytes, full_sync_v2));
if (sync_stats) {
sync_stats->get_remote_delete_bitmap_rpc_ns +=
std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();
sync_stats->get_remote_delete_bitmap_key_count +=
delete_bitmaps.size() + delete_bitmap_storages.size();
for (const auto& dbm : delete_bitmaps) {
sync_stats->get_remote_delete_bitmap_bytes += dbm.length();
}
sync_stats->get_remote_delete_bitmap_bytes += remote_delete_bitmap_bytes;
}
int64_t latency = std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();
if (latency > 100 * 1000) { // 100ms
LOG(INFO) << "finish get_delete_bitmap rpcs. rowset_ids.size()=" << rowset_ids.size()
<< ", delete_bitmaps.size()=" << delete_bitmaps.size()
<< ", delta_delete_bitmaps.size()=" << delta_rowset_ids.size()
<< ", latency=" << latency << "us, read_version=" << read_version;
} else {
LOG_EVERY_N(INFO, 100) << "finish get_delete_bitmap rpcs. rowset_ids.size()="
<< rowset_ids.size()
<< ", delete_bitmaps.size()=" << delete_bitmaps.size()
<< ", delta_delete_bitmaps.size()=" << delta_rowset_ids.size()
<< ", latency=" << latency << "us, read_version=" << read_version;
}
return Status::OK();
}
Status CloudMetaMgr::_check_delete_bitmap_v2_correctness(CloudTablet* tablet, GetRowsetRequest& req,
GetRowsetResponse& resp,
int64_t old_max_version) {
if (!config::enable_delete_bitmap_store_v2_check_correctness ||
config::delete_bitmap_store_write_version == 1 || resp.rowset_meta().empty()) {
return Status::OK();
}
int64_t tablet_id = tablet->tablet_id();
int64_t new_max_version = std::max(old_max_version, resp.rowset_meta().rbegin()->end_version());
// rowset_id, num_segments
std::vector<std::pair<RowsetId, int64_t>> all_rowsets;
std::map<std::string, std::string> rowset_to_resource;
for (const auto& rs_meta : resp.rowset_meta()) {
RowsetId rowset_id;
rowset_id.init(rs_meta.rowset_id_v2());
all_rowsets.emplace_back(std::make_pair(rowset_id, rs_meta.num_segments()));
rowset_to_resource[rs_meta.rowset_id_v2()] = rs_meta.resource_id();
}
if (old_max_version > 0) {
RowsetIdUnorderedSet all_rs_ids;
RETURN_IF_ERROR(tablet->get_all_rs_id(old_max_version, &all_rs_ids));
for (auto& rowset : tablet->get_rowset_by_ids(&all_rs_ids)) {
all_rowsets.emplace_back(std::make_pair(rowset->rowset_id(), rowset->num_segments()));
rowset_to_resource[rowset->rowset_id().to_string()] =
rowset->rowset_meta()->resource_id();
}
}
auto compare_delete_bitmap = [&](DeleteBitmap* delete_bitmap, int version) {
bool success = true;
for (auto& [rs_id, num_segments] : all_rowsets) {
for (int seg_id = 0; seg_id < num_segments; ++seg_id) {
DeleteBitmap::BitmapKey key = {rs_id, seg_id, new_max_version};
auto dm1 = tablet->tablet_meta()->delete_bitmap().get_agg(key);
auto dm2 = delete_bitmap->get_agg_without_cache(key);
if (*dm1 != *dm2) {
success = false;
LOG(WARNING) << "failed to check delete bitmap correctness by v"
<< std::to_string(version) << ", tablet_id=" << tablet->tablet_id()
<< ", rowset_id=" << rs_id.to_string() << ", segment_id=" << seg_id
<< ", max_version=" << new_max_version
<< ". size1=" << dm1->cardinality()
<< ", size2=" << dm2->cardinality();
}
}
}
if (success) {
LOG(INFO) << "succeed to check delete bitmap correctness by v"
<< std::to_string(version) << ", tablet_id=" << tablet->tablet_id()
<< ", max_version=" << new_max_version;
}
};
DeleteBitmap full_delete_bitmap(tablet_id);
auto st = sync_tablet_delete_bitmap(tablet, old_max_version, resp.rowset_meta(), resp.stats(),
req.idx(), &full_delete_bitmap, false, nullptr, 2, true);
if (!st.ok()) {
LOG_WARNING("failed to check delete bitmap correctness by v2")
.tag("tablet", tablet->tablet_id())
.error(st);
} else {
compare_delete_bitmap(&full_delete_bitmap, 2);
}
return Status::OK();
}
Status CloudMetaMgr::_read_tablet_delete_bitmap_v2(CloudTablet* tablet, int64_t old_max_version,
std::ranges::range auto&& rs_metas,
DeleteBitmap* delete_bitmap,
GetDeleteBitmapResponse& res,
int64_t& remote_delete_bitmap_bytes,
bool full_sync_v2) {
if (res.delta_rowset_ids().empty()) {
return Status::OK();
}
const auto& rowset_ids = res.delta_rowset_ids();
const auto& delete_bitmap_storages = res.delete_bitmap_storages();
RowsetIdUnorderedSet all_rs_ids;
std::map<std::string, std::string> rowset_to_resource;
if (old_max_version > 0) {
RETURN_IF_ERROR(tablet->get_all_rs_id(old_max_version, &all_rs_ids));
if (full_sync_v2) {
for (auto& rowset : tablet->get_rowset_by_ids(&all_rs_ids)) {
rowset_to_resource[rowset->rowset_id().to_string()] =
rowset->rowset_meta()->resource_id();
}
}
}
for (const auto& rs_meta : rs_metas) {
RowsetId rs_id;
rs_id.init(rs_meta.rowset_id_v2());
all_rs_ids.emplace(rs_id);
rowset_to_resource[rs_meta.rowset_id_v2()] = rs_meta.resource_id();
}
if (config::enable_mow_verbose_log) {
LOG(INFO) << "read delete bitmap for tablet_id=" << tablet->tablet_id()
<< ", old_max_version=" << old_max_version
<< ", new rowset num=" << rs_metas.size()
<< ", rowset has delete bitmap num=" << rowset_ids.size()
<< ". all rowset num=" << all_rs_ids.size();
}
std::mutex result_mtx;
Status result;
auto merge_delete_bitmap = [&](const std::string& rowset_id, DeleteBitmapPB& dbm) {
if (dbm.rowset_ids_size() != dbm.segment_ids_size() ||
dbm.rowset_ids_size() != dbm.versions_size() ||
dbm.rowset_ids_size() != dbm.segment_delete_bitmaps_size()) {
return Status::Error<ErrorCode::INTERNAL_ERROR, false>(
"get delete bitmap data wrong, rowset_id={}"
"rowset_ids.size={},segment_ids.size={},vers.size={},delete_bitmaps.size={}",
rowset_id, dbm.rowset_ids_size(), dbm.segment_ids_size(), dbm.versions_size(),
dbm.segment_delete_bitmaps_size());
}
if (config::enable_mow_verbose_log) {
LOG(INFO) << "get delete bitmap for tablet_id=" << tablet->tablet_id()
<< ", rowset_id=" << rowset_id
<< ", delete_bitmap num=" << dbm.segment_delete_bitmaps_size();
}
std::lock_guard lock(result_mtx);
for (int j = 0; j < dbm.rowset_ids_size(); j++) {
RowsetId rst_id;
rst_id.init(dbm.rowset_ids(j));
if (!all_rs_ids.contains(rst_id)) {
LOG(INFO) << "skip merge delete bitmap for tablet_id=" << tablet->tablet_id()
<< ", rowset_id=" << rowset_id << ", unused rowset_id=" << rst_id;
continue;
}
delete_bitmap->merge(
{rst_id, dbm.segment_ids(j), dbm.versions(j)},
roaring::Roaring::readSafe(dbm.segment_delete_bitmaps(j).data(),
dbm.segment_delete_bitmaps(j).length()));
remote_delete_bitmap_bytes += dbm.segment_delete_bitmaps(j).length();
}
return Status::OK();
};
auto get_delete_bitmap_from_file = [&](const std::string& rowset_id) {
if (config::enable_mow_verbose_log) {
LOG(INFO) << "get delete bitmap for tablet_id=" << tablet->tablet_id()
<< ", rowset_id=" << rowset_id << " from file";
}
if (rowset_to_resource.find(rowset_id) == rowset_to_resource.end()) {
return Status::InternalError("vault id not found for tablet_id={}, rowset_id={}",
tablet->tablet_id(), rowset_id);
}
auto resource_id = rowset_to_resource[rowset_id];
CloudStorageEngine& engine = ExecEnv::GetInstance()->storage_engine().to_cloud();
auto storage_resource = engine.get_storage_resource(resource_id);
if (!storage_resource) {
return Status::InternalError("vault id not found, maybe not sync, vault id {}",
resource_id);
}
DeleteBitmapFileReader reader(tablet->tablet_id(), rowset_id, storage_resource);
RETURN_IF_ERROR(reader.init());
DeleteBitmapPB dbm;
RETURN_IF_ERROR(reader.read(dbm));
RETURN_IF_ERROR(reader.close());
return merge_delete_bitmap(rowset_id, dbm);
};
CloudStorageEngine& engine = ExecEnv::GetInstance()->storage_engine().to_cloud();
std::unique_ptr<ThreadPoolToken> token = engine.sync_delete_bitmap_thread_pool().new_token(
ThreadPool::ExecutionMode::CONCURRENT);
bthread::CountdownEvent wait {rowset_ids.size()};
for (int i = 0; i < rowset_ids.size(); i++) {
auto& rowset_id = rowset_ids[i];
if (delete_bitmap_storages[i].store_in_fdb()) {
wait.signal();
DeleteBitmapPB dbm = delete_bitmap_storages[i].delete_bitmap();
RETURN_IF_ERROR(merge_delete_bitmap(rowset_id, dbm));
} else {
auto submit_st = token->submit_func([&]() {
auto status = get_delete_bitmap_from_file(rowset_id);
if (!status.ok()) {
LOG(WARNING) << "failed to get delete bitmap for tablet_id="
<< tablet->tablet_id() << ", rowset_id=" << rowset_id
<< " from file, st=" << status.to_string();
std::lock_guard lock(result_mtx);
if (result.ok()) {
result = status;
}
}
wait.signal();
});
RETURN_IF_ERROR(submit_st);
}
}
// wait for all finished
wait.wait();
token->wait();
return result;
}
Status CloudMetaMgr::prepare_rowset(const RowsetMeta& rs_meta, const std::string& job_id,
RowsetMetaSharedPtr* existed_rs_meta) {
VLOG_DEBUG << "prepare rowset, tablet_id: " << rs_meta.tablet_id()
<< ", rowset_id: " << rs_meta.rowset_id() << " txn_id: " << rs_meta.txn_id();
{
Status ret_st;
TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::prepare_rowset", ret_st);
}
CreateRowsetRequest req;
CreateRowsetResponse resp;
req.set_cloud_unique_id(config::cloud_unique_id);
req.set_txn_id(rs_meta.txn_id());
req.set_tablet_job_id(job_id);
RowsetMetaPB doris_rs_meta = rs_meta.get_rowset_pb(/*skip_schema=*/true);
doris_rowset_meta_to_cloud(req.mutable_rowset_meta(), std::move(doris_rs_meta));
Status st = retry_rpc("prepare rowset", req, &resp, &MetaService_Stub::prepare_rowset);
if (!st.ok() && resp.status().code() == MetaServiceCode::ALREADY_EXISTED) {
if (existed_rs_meta != nullptr && resp.has_existed_rowset_meta()) {
RowsetMetaPB doris_rs_meta_tmp =
cloud_rowset_meta_to_doris(std::move(*resp.mutable_existed_rowset_meta()));
*existed_rs_meta = std::make_shared<RowsetMeta>();
(*existed_rs_meta)->init_from_pb(doris_rs_meta_tmp);
}
return Status::AlreadyExist("failed to prepare rowset: {}", resp.status().msg());
}
return st;
}
Status CloudMetaMgr::commit_rowset(RowsetMeta& rs_meta, const std::string& job_id,
RowsetMetaSharedPtr* existed_rs_meta) {
VLOG_DEBUG << "commit rowset, tablet_id: " << rs_meta.tablet_id()
<< ", rowset_id: " << rs_meta.rowset_id() << " txn_id: " << rs_meta.txn_id();
{
Status ret_st;
TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::commit_rowset", ret_st);
}
check_table_size_correctness(rs_meta);
CreateRowsetRequest req;
CreateRowsetResponse resp;
req.set_cloud_unique_id(config::cloud_unique_id);
req.set_txn_id(rs_meta.txn_id());
req.set_tablet_job_id(job_id);
RowsetMetaPB rs_meta_pb = rs_meta.get_rowset_pb();
doris_rowset_meta_to_cloud(req.mutable_rowset_meta(), std::move(rs_meta_pb));
Status st = retry_rpc("commit rowset", req, &resp, &MetaService_Stub::commit_rowset);
if (!st.ok() && resp.status().code() == MetaServiceCode::ALREADY_EXISTED) {
if (existed_rs_meta != nullptr && resp.has_existed_rowset_meta()) {
RowsetMetaPB doris_rs_meta =
cloud_rowset_meta_to_doris(std::move(*resp.mutable_existed_rowset_meta()));
*existed_rs_meta = std::make_shared<RowsetMeta>();
(*existed_rs_meta)->init_from_pb(doris_rs_meta);
}
return Status::AlreadyExist("failed to commit rowset: {}", resp.status().msg());
}
int64_t timeout_ms = -1;
// if the `job_id` is not empty, it means this rowset was produced by a compaction job.
if (config::enable_compaction_delay_commit_for_warm_up && !job_id.empty()) {
// 1. assume the download speed is 100MB/s
// 2. we double the download time as timeout for safety
// 3. for small rowsets, the timeout we calculate maybe quite small, so we need a min_time_out
const double speed_mbps = 100.0; // 100MB/s
const double safety_factor = 2.0;
timeout_ms = std::min(
std::max(static_cast<int64_t>(static_cast<double>(rs_meta.data_disk_size()) /
(speed_mbps * 1024 * 1024) * safety_factor * 1000),
config::warm_up_rowset_sync_wait_min_timeout_ms),
config::warm_up_rowset_sync_wait_max_timeout_ms);
LOG(INFO) << "warm up rowset: " << rs_meta.version() << ", job_id: " << job_id
<< ", with timeout: " << timeout_ms << " ms";
}
auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
manager.warm_up_rowset(rs_meta, timeout_ms);
return st;
}
Status CloudMetaMgr::update_tmp_rowset(const RowsetMeta& rs_meta) {
VLOG_DEBUG << "update committed rowset, tablet_id: " << rs_meta.tablet_id()
<< ", rowset_id: " << rs_meta.rowset_id();
CreateRowsetRequest req;
CreateRowsetResponse resp;
req.set_cloud_unique_id(config::cloud_unique_id);
// Variant schema maybe updated, so we need to update the schema as well.
// The updated rowset meta after `rowset->merge_rowset_meta` in `BaseTablet::update_delete_bitmap`
// will be lost in `update_tmp_rowset` if skip_schema.So in order to keep the latest schema we should keep schema in update_tmp_rowset
// for variant type
bool skip_schema = rs_meta.tablet_schema()->num_variant_columns() == 0;
RowsetMetaPB rs_meta_pb = rs_meta.get_rowset_pb(skip_schema);
doris_rowset_meta_to_cloud(req.mutable_rowset_meta(), std::move(rs_meta_pb));
Status st =
retry_rpc("update committed rowset", req, &resp, &MetaService_Stub::update_tmp_rowset);
if (!st.ok() && resp.status().code() == MetaServiceCode::ROWSET_META_NOT_FOUND) {
return Status::InternalError("failed to update committed rowset: {}", resp.status().msg());
}
return st;
}
// async send TableStats(in res) to FE coz we are in streamload ctx, response to the user ASAP
static void send_stats_to_fe_async(const int64_t db_id, const int64_t txn_id,
const std::string& label, CommitTxnResponse& res) {
std::string protobufBytes;
res.SerializeToString(&protobufBytes);
auto st = ExecEnv::GetInstance()->send_table_stats_thread_pool()->submit_func(
[db_id, txn_id, label, protobufBytes]() -> Status {
TReportCommitTxnResultRequest request;
TStatus result;
if (protobufBytes.length() <= 0) {
LOG(WARNING) << "protobufBytes: " << protobufBytes.length();
return Status::OK(); // nobody cares the return status
}
request.__set_dbId(db_id);
request.__set_txnId(txn_id);
request.__set_label(label);
request.__set_payload(protobufBytes);
Status status;
int64_t duration_ns = 0;
TNetworkAddress master_addr =
ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
if (master_addr.hostname.empty() || master_addr.port == 0) {
status = Status::Error<SERVICE_UNAVAILABLE>(
"Have not get FE Master heartbeat yet");
} else {
SCOPED_RAW_TIMER(&duration_ns);
RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&request, &result](FrontendServiceConnection& client) {
client->reportCommitTxnResult(result, request);
}));
status = Status::create<false>(result);
}
g_cloud_commit_txn_resp_redirect_latency << duration_ns / 1000;
if (!status.ok()) {
LOG(WARNING) << "TableStats report RPC to FE failed, errmsg=" << status
<< " dbId=" << db_id << " txnId=" << txn_id << " label=" << label;
return Status::OK(); // nobody cares the return status
} else {
LOG(INFO) << "TableStats report RPC to FE success, msg=" << status
<< " dbId=" << db_id << " txnId=" << txn_id << " label=" << label;
return Status::OK();
}
});
if (!st.ok()) {
LOG(WARNING) << "TableStats report to FE task submission failed: " << st.to_string();
}
}
Status CloudMetaMgr::commit_txn(const StreamLoadContext& ctx, bool is_2pc) {
VLOG_DEBUG << "commit txn, db_id: " << ctx.db_id << ", txn_id: " << ctx.txn_id
<< ", label: " << ctx.label << ", is_2pc: " << is_2pc;
{
Status ret_st;
TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::commit_txn", ret_st);
}
CommitTxnRequest req;
CommitTxnResponse res;
req.set_cloud_unique_id(config::cloud_unique_id);
req.set_db_id(ctx.db_id);
req.set_txn_id(ctx.txn_id);
req.set_is_2pc(is_2pc);
req.set_enable_txn_lazy_commit(config::enable_cloud_txn_lazy_commit);
auto st = retry_rpc("commit txn", req, &res, &MetaService_Stub::commit_txn);
if (st.ok()) {
send_stats_to_fe_async(ctx.db_id, ctx.txn_id, ctx.label, res);
}
return st;
}
Status CloudMetaMgr::abort_txn(const StreamLoadContext& ctx) {
VLOG_DEBUG << "abort txn, db_id: " << ctx.db_id << ", txn_id: " << ctx.txn_id
<< ", label: " << ctx.label;
{
Status ret_st;
TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::abort_txn", ret_st);
}
AbortTxnRequest req;
AbortTxnResponse res;
req.set_cloud_unique_id(config::cloud_unique_id);
req.set_reason(std::string(ctx.status.msg().substr(0, 1024)));
if (ctx.db_id > 0 && !ctx.label.empty()) {
req.set_db_id(ctx.db_id);
req.set_label(ctx.label);
} else if (ctx.txn_id > 0) {
req.set_txn_id(ctx.txn_id);
} else {
LOG(WARNING) << "failed abort txn, with illegal input, db_id=" << ctx.db_id
<< " txn_id=" << ctx.txn_id << " label=" << ctx.label;
return Status::InternalError<false>("failed to abort txn");
}
return retry_rpc("abort txn", req, &res, &MetaService_Stub::abort_txn);
}
Status CloudMetaMgr::precommit_txn(const StreamLoadContext& ctx) {
VLOG_DEBUG << "precommit txn, db_id: " << ctx.db_id << ", txn_id: " << ctx.txn_id
<< ", label: " << ctx.label;
{
Status ret_st;
TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::precommit_txn", ret_st);
}
PrecommitTxnRequest req;
PrecommitTxnResponse res;
req.set_cloud_unique_id(config::cloud_unique_id);
req.set_db_id(ctx.db_id);
req.set_txn_id(ctx.txn_id);
return retry_rpc("precommit txn", req, &res, &MetaService_Stub::precommit_txn);
}
Status CloudMetaMgr::prepare_restore_job(const TabletMetaPB& tablet_meta) {
VLOG_DEBUG << "prepare restore job, tablet_id: " << tablet_meta.tablet_id();
RestoreJobRequest req;
RestoreJobResponse resp;
req.set_cloud_unique_id(config::cloud_unique_id);
req.set_tablet_id(tablet_meta.tablet_id());
req.set_expiration(config::snapshot_expire_time_sec);
req.set_action(RestoreJobRequest::PREPARE);
doris_tablet_meta_to_cloud(req.mutable_tablet_meta(), std::move(tablet_meta));
return retry_rpc("prepare restore job", req, &resp, &MetaService_Stub::prepare_restore_job);
}
Status CloudMetaMgr::commit_restore_job(const int64_t tablet_id) {
VLOG_DEBUG << "commit restore job, tablet_id: " << tablet_id;
RestoreJobRequest req;
RestoreJobResponse resp;
req.set_cloud_unique_id(config::cloud_unique_id);
req.set_tablet_id(tablet_id);
req.set_action(RestoreJobRequest::COMMIT);
req.set_store_version(config::delete_bitmap_store_write_version);
return retry_rpc("commit restore job", req, &resp, &MetaService_Stub::commit_restore_job);
}
Status CloudMetaMgr::finish_restore_job(const int64_t tablet_id, bool is_completed) {
VLOG_DEBUG << "finish restore job, tablet_id: " << tablet_id
<< ", is_completed: " << is_completed;
RestoreJobRequest req;
RestoreJobResponse resp;
req.set_cloud_unique_id(config::cloud_unique_id);
req.set_tablet_id(tablet_id);
req.set_action(is_completed ? RestoreJobRequest::COMPLETE : RestoreJobRequest::ABORT);
return retry_rpc("finish restore job", req, &resp, &MetaService_Stub::finish_restore_job);
}
Status CloudMetaMgr::get_storage_vault_info(StorageVaultInfos* vault_infos, bool* is_vault_mode) {
GetObjStoreInfoRequest req;
GetObjStoreInfoResponse resp;
req.set_cloud_unique_id(config::cloud_unique_id);
Status s =
retry_rpc("get storage vault info", req, &resp, &MetaService_Stub::get_obj_store_info);
if (!s.ok()) {
return s;
}
*is_vault_mode = resp.enable_storage_vault();
auto add_obj_store = [&vault_infos](const auto& obj_store) {
vault_infos->emplace_back(obj_store.id(), S3Conf::get_s3_conf(obj_store),
StorageVaultPB_PathFormat {});
};
std::ranges::for_each(resp.obj_info(), add_obj_store);
std::ranges::for_each(resp.storage_vault(), [&](const auto& vault) {
if (vault.has_hdfs_info()) {
vault_infos->emplace_back(vault.id(), vault.hdfs_info(), vault.path_format());
}
if (vault.has_obj_info()) {
add_obj_store(vault.obj_info());
}
});
// desensitization, hide secret
for (int i = 0; i < resp.obj_info_size(); ++i) {
resp.mutable_obj_info(i)->set_sk(resp.obj_info(i).sk().substr(0, 2) + "xxx");
}
for (int i = 0; i < resp.storage_vault_size(); ++i) {
auto* j = resp.mutable_storage_vault(i);
if (!j->has_obj_info()) continue;
j->mutable_obj_info()->set_sk(j->obj_info().sk().substr(0, 2) + "xxx");
}
for (int i = 0; i < resp.obj_info_size(); ++i) {
resp.mutable_obj_info(i)->set_ak(hide_access_key(resp.obj_info(i).sk()));
}
for (int i = 0; i < resp.storage_vault_size(); ++i) {
auto* j = resp.mutable_storage_vault(i);
if (!j->has_obj_info()) continue;
j->mutable_obj_info()->set_sk(hide_access_key(j->obj_info().sk()));
}
LOG(INFO) << "get storage vault, enable_storage_vault=" << *is_vault_mode
<< " response=" << resp.ShortDebugString();
return Status::OK();
}
Status CloudMetaMgr::prepare_tablet_job(const TabletJobInfoPB& job, StartTabletJobResponse* res) {
VLOG_DEBUG << "prepare_tablet_job: " << job.ShortDebugString();
TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::prepare_tablet_job", Status::OK(), job, res);
StartTabletJobRequest req;
req.mutable_job()->CopyFrom(job);
req.set_cloud_unique_id(config::cloud_unique_id);
return retry_rpc("start tablet job", req, res, &MetaService_Stub::start_tablet_job);
}
Status CloudMetaMgr::commit_tablet_job(const TabletJobInfoPB& job, FinishTabletJobResponse* res) {
VLOG_DEBUG << "commit_tablet_job: " << job.ShortDebugString();
TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::commit_tablet_job", Status::OK(), job, res);
DBUG_EXECUTE_IF("CloudMetaMgr::commit_tablet_job.fail", {
return Status::InternalError<false>("inject CloudMetaMgr::commit_tablet_job.fail");
});
FinishTabletJobRequest req;
req.mutable_job()->CopyFrom(job);
req.set_action(FinishTabletJobRequest::COMMIT);
req.set_cloud_unique_id(config::cloud_unique_id);
auto st = retry_rpc("commit tablet job", req, res, &MetaService_Stub::finish_tablet_job);
if (res->status().code() == MetaServiceCode::KV_TXN_CONFLICT_RETRY_EXCEEDED_MAX_TIMES) {
return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR, false>(
"txn conflict when commit tablet job {}", job.ShortDebugString());
}
return st;
}
Status CloudMetaMgr::abort_tablet_job(const TabletJobInfoPB& job) {
VLOG_DEBUG << "abort_tablet_job: " << job.ShortDebugString();
FinishTabletJobRequest req;
FinishTabletJobResponse res;
req.mutable_job()->CopyFrom(job);
req.set_action(FinishTabletJobRequest::ABORT);
req.set_cloud_unique_id(config::cloud_unique_id);
return retry_rpc("abort tablet job", req, &res, &MetaService_Stub::finish_tablet_job);
}
Status CloudMetaMgr::lease_tablet_job(const TabletJobInfoPB& job) {
VLOG_DEBUG << "lease_tablet_job: " << job.ShortDebugString();
FinishTabletJobRequest req;
FinishTabletJobResponse res;
req.mutable_job()->CopyFrom(job);
req.set_action(FinishTabletJobRequest::LEASE);
req.set_cloud_unique_id(config::cloud_unique_id);
return retry_rpc("lease tablet job", req, &res, &MetaService_Stub::finish_tablet_job);
}
Status CloudMetaMgr::update_delete_bitmap(const CloudTablet& tablet, int64_t lock_id,
int64_t initiator, DeleteBitmap* delete_bitmap,
DeleteBitmap* delete_bitmap_v2, std::string rowset_id,
std::optional<StorageResource> storage_resource,
int64_t store_version, int64_t txn_id,
bool is_explicit_txn, int64_t next_visible_version) {
VLOG_DEBUG << "update_delete_bitmap , tablet_id: " << tablet.tablet_id();
if (config::enable_mow_verbose_log) {
std::stringstream ss;
ss << "start update delete bitmap for tablet_id: " << tablet.tablet_id()
<< ", rowset_id: " << rowset_id
<< ", delete_bitmap num: " << delete_bitmap->delete_bitmap.size()
<< ", store_version: " << store_version << ", lock_id=" << lock_id
<< ", initiator=" << initiator;
if (store_version == 2 || store_version == 3) {
ss << ", delete_bitmap v2 num: " << delete_bitmap_v2->delete_bitmap.size();
}
LOG(INFO) << ss.str();
}
UpdateDeleteBitmapRequest req;
UpdateDeleteBitmapResponse res;
req.set_cloud_unique_id(config::cloud_unique_id);
req.set_table_id(tablet.table_id());
req.set_partition_id(tablet.partition_id());
req.set_tablet_id(tablet.tablet_id());
req.set_lock_id(lock_id);
req.set_initiator(initiator);
req.set_is_explicit_txn(is_explicit_txn);
if (txn_id > 0) {
req.set_txn_id(txn_id);
}
if (next_visible_version > 0) {
req.set_next_visible_version(next_visible_version);
}
req.set_store_version(store_version);
bool write_v1 = store_version == 1 || store_version == 3;
bool write_v2 = store_version == 2 || store_version == 3;
// write v1 kvs
if (write_v1) {
for (auto& [key, bitmap] : delete_bitmap->delete_bitmap) {
req.add_rowset_ids(std::get<0>(key).to_string());
req.add_segment_ids(std::get<1>(key));
req.add_versions(std::get<2>(key));
// To save space, convert array and bitmap containers to run containers
bitmap.runOptimize();
std::string bitmap_data(bitmap.getSizeInBytes(), '\0');
bitmap.write(bitmap_data.data());
*(req.add_segment_delete_bitmaps()) = std::move(bitmap_data);
}
}
// write v2 kvs
if (write_v2) {
auto add_delete_bitmap = [](DeleteBitmapPB& delete_bitmap_pb,
const DeleteBitmap::BitmapKey& key, roaring::Roaring& bitmap) {
delete_bitmap_pb.add_rowset_ids(std::get<0>(key).to_string());
delete_bitmap_pb.add_segment_ids(std::get<1>(key));
delete_bitmap_pb.add_versions(std::get<2>(key));
// To save space, convert array and bitmap containers to run containers
bitmap.runOptimize();
std::string bitmap_data(bitmap.getSizeInBytes(), '\0');
bitmap.write(bitmap_data.data());
*(delete_bitmap_pb.add_segment_delete_bitmaps()) = std::move(bitmap_data);
};
auto store_delete_bitmap = [&](std::string& rowset_id, DeleteBitmapPB& delete_bitmap_pb) {
if (config::enable_mow_verbose_log) {
std::stringstream ss;
for (int i = 0; i < delete_bitmap_pb.rowset_ids_size(); i++) {
ss << "{rid=" << delete_bitmap_pb.rowset_ids(i)
<< ", sid=" << delete_bitmap_pb.segment_ids(i)
<< ", ver=" << delete_bitmap_pb.versions(i) << "}, ";
}
LOG(INFO) << "handle one rowset delete bitmap for tablet_id: " << tablet.tablet_id()
<< ", rowset_id: " << rowset_id
<< ", delete_bitmap num: " << delete_bitmap_pb.rowset_ids_size()
<< ", size: " << delete_bitmap_pb.ByteSizeLong() << ", keys=["
<< ss.str() << "]";
}
if (delete_bitmap_pb.rowset_ids_size() == 0) {
return Status::OK();
}
DeleteBitmapStoragePB delete_bitmap_storage;
if (config::delete_bitmap_store_v2_max_bytes_in_fdb >= 0 &&
delete_bitmap_pb.ByteSizeLong() > config::delete_bitmap_store_v2_max_bytes_in_fdb) {
DeleteBitmapFileWriter file_writer(tablet.tablet_id(), rowset_id, storage_resource);
RETURN_IF_ERROR(file_writer.init());
RETURN_IF_ERROR(file_writer.write(delete_bitmap_pb));
RETURN_IF_ERROR(file_writer.close());
delete_bitmap_pb.Clear();
delete_bitmap_storage.set_store_in_fdb(false);
} else {
delete_bitmap_storage.set_store_in_fdb(true);
*(delete_bitmap_storage.mutable_delete_bitmap()) = std::move(delete_bitmap_pb);
}
req.add_delta_rowset_ids(rowset_id);
*(req.add_delete_bitmap_storages()) = std::move(delete_bitmap_storage);
return Status::OK();
};
if (config::enable_mow_verbose_log) {
LOG(INFO) << "update delete bitmap for tablet_id: " << tablet.tablet_id()
<< ", rowset_id: " << rowset_id
<< ", delete_bitmap num: " << delete_bitmap_v2->delete_bitmap.size()
<< ", lock_id=" << lock_id << ", initiator=" << initiator;
}
if (rowset_id.empty()) {
std::string pre_rowset_id = "";
std::string cur_rowset_id = "";
DeleteBitmapPB delete_bitmap_pb;
for (auto it = delete_bitmap_v2->delete_bitmap.begin();
it != delete_bitmap_v2->delete_bitmap.end(); ++it) {
auto& key = it->first;
auto& bitmap = it->second;
cur_rowset_id = std::get<0>(key).to_string();
if (cur_rowset_id != pre_rowset_id) {
if (!pre_rowset_id.empty() && delete_bitmap_pb.rowset_ids_size() > 0) {
RETURN_IF_ERROR(store_delete_bitmap(pre_rowset_id, delete_bitmap_pb));
}
pre_rowset_id = cur_rowset_id;
DCHECK_EQ(delete_bitmap_pb.rowset_ids_size(), 0);
DCHECK_EQ(delete_bitmap_pb.segment_ids_size(), 0);
DCHECK_EQ(delete_bitmap_pb.versions_size(), 0);
DCHECK_EQ(delete_bitmap_pb.segment_delete_bitmaps_size(), 0);
}
add_delete_bitmap(delete_bitmap_pb, key, bitmap);
}
if (delete_bitmap_pb.rowset_ids_size() > 0) {
DCHECK(!cur_rowset_id.empty());
RETURN_IF_ERROR(store_delete_bitmap(cur_rowset_id, delete_bitmap_pb));
}
} else {
DeleteBitmapPB delete_bitmap_pb;
for (auto& [key, bitmap] : delete_bitmap_v2->delete_bitmap) {
add_delete_bitmap(delete_bitmap_pb, key, bitmap);
}
RETURN_IF_ERROR(store_delete_bitmap(rowset_id, delete_bitmap_pb));
}
DCHECK_EQ(req.delta_rowset_ids_size(), req.delete_bitmap_storages_size());
}
DBUG_EXECUTE_IF("CloudMetaMgr::test_update_big_delete_bitmap", {
LOG(INFO) << "test_update_big_delete_bitmap for tablet " << tablet.tablet_id();
auto count = dp->param<int>("count", 30000);
if (!delete_bitmap->delete_bitmap.empty()) {
auto& key = delete_bitmap->delete_bitmap.begin()->first;
auto& bitmap = delete_bitmap->delete_bitmap.begin()->second;
for (int i = 1000; i < (1000 + count); i++) {
req.add_rowset_ids(std::get<0>(key).to_string());
req.add_segment_ids(std::get<1>(key));
req.add_versions(i);
// To save space, convert array and bitmap containers to run containers
bitmap.runOptimize();
std::string bitmap_data(bitmap.getSizeInBytes(), '\0');
bitmap.write(bitmap_data.data());
*(req.add_segment_delete_bitmaps()) = std::move(bitmap_data);
}
}
});
DBUG_EXECUTE_IF("CloudMetaMgr::test_update_delete_bitmap_fail", {
return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR>(
"test update delete bitmap failed, tablet_id: {}, lock_id: {}", tablet.tablet_id(),
lock_id);
});
auto st = retry_rpc("update delete bitmap", req, &res, &MetaService_Stub::update_delete_bitmap);
if (config::enable_update_delete_bitmap_kv_check_core &&
res.status().code() == MetaServiceCode::UPDATE_OVERRIDE_EXISTING_KV) {
auto& msg = res.status().msg();
LOG_WARNING(msg);
CHECK(false) << msg;
}
if (res.status().code() == MetaServiceCode::LOCK_EXPIRED) {
return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR, false>(
"lock expired when update delete bitmap, tablet_id: {}, lock_id: {}, initiator: "
"{}, error_msg: {}",
tablet.tablet_id(), lock_id, initiator, res.status().msg());
}
return st;
}
Status CloudMetaMgr::cloud_update_delete_bitmap_without_lock(
const CloudTablet& tablet, DeleteBitmap* delete_bitmap,
std::map<std::string, int64_t>& rowset_to_versions, int64_t pre_rowset_agg_start_version,
int64_t pre_rowset_agg_end_version) {
if (config::delete_bitmap_store_write_version == 2) {
VLOG_DEBUG << "no need to agg delete bitmap v1 in ms because use v2";
return Status::OK();
}
LOG(INFO) << "cloud_update_delete_bitmap_without_lock, tablet_id: " << tablet.tablet_id()
<< ", delete_bitmap size: " << delete_bitmap->delete_bitmap.size();
UpdateDeleteBitmapRequest req;
UpdateDeleteBitmapResponse res;
req.set_cloud_unique_id(config::cloud_unique_id);
req.set_table_id(tablet.table_id());
req.set_partition_id(tablet.partition_id());
req.set_tablet_id(tablet.tablet_id());
// use a fake lock id to resolve compatibility issues
req.set_lock_id(-3);
req.set_without_lock(true);
for (auto& [key, bitmap] : delete_bitmap->delete_bitmap) {
req.add_rowset_ids(std::get<0>(key).to_string());
req.add_segment_ids(std::get<1>(key));
req.add_versions(std::get<2>(key));
if (pre_rowset_agg_end_version > 0) {
DCHECK(rowset_to_versions.find(std::get<0>(key).to_string()) !=
rowset_to_versions.end())
<< "rowset_to_versions not found for key=" << std::get<0>(key).to_string();
req.add_pre_rowset_versions(rowset_to_versions[std::get<0>(key).to_string()]);
}
DCHECK(pre_rowset_agg_end_version <= 0 || pre_rowset_agg_end_version == std::get<2>(key))
<< "pre_rowset_agg_end_version=" << pre_rowset_agg_end_version
<< " not equal to version=" << std::get<2>(key);
// To save space, convert array and bitmap containers to run containers
bitmap.runOptimize();
std::string bitmap_data(bitmap.getSizeInBytes(), '\0');
bitmap.write(bitmap_data.data());
*(req.add_segment_delete_bitmaps()) = std::move(bitmap_data);
}
if (pre_rowset_agg_start_version > 0 && pre_rowset_agg_end_version > 0) {
req.set_pre_rowset_agg_start_version(pre_rowset_agg_start_version);
req.set_pre_rowset_agg_end_version(pre_rowset_agg_end_version);
}
return retry_rpc("update delete bitmap", req, &res, &MetaService_Stub::update_delete_bitmap);
}
Status CloudMetaMgr::get_delete_bitmap_update_lock(const CloudTablet& tablet, int64_t lock_id,
int64_t initiator) {
DBUG_EXECUTE_IF("get_delete_bitmap_update_lock.inject_fail", {
auto p = dp->param("percent", 0.01);
std::mt19937 gen {std::random_device {}()};
std::bernoulli_distribution inject_fault {p};
if (inject_fault(gen)) {
return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR>(
"injection error when get get_delete_bitmap_update_lock, "
"tablet_id={}, lock_id={}, initiator={}",
tablet.tablet_id(), lock_id, initiator);
}
});
VLOG_DEBUG << "get_delete_bitmap_update_lock , tablet_id: " << tablet.tablet_id()
<< ",lock_id:" << lock_id;
GetDeleteBitmapUpdateLockRequest req;
GetDeleteBitmapUpdateLockResponse res;
req.set_cloud_unique_id(config::cloud_unique_id);
req.set_table_id(tablet.table_id());
req.set_lock_id(lock_id);
req.set_initiator(initiator);
// set expiration time for compaction and schema_change
req.set_expiration(config::delete_bitmap_lock_expiration_seconds);
int retry_times = 0;
Status st;
std::default_random_engine rng = make_random_engine();
std::uniform_int_distribution<uint32_t> u(500, 2000);
uint64_t backoff_sleep_time_ms {0};
do {
bool test_conflict = false;
st = retry_rpc("get delete bitmap update lock", req, &res,
&MetaService_Stub::get_delete_bitmap_update_lock);
DBUG_EXECUTE_IF("CloudMetaMgr::test_get_delete_bitmap_update_lock_conflict",
{ test_conflict = true; });
if (!test_conflict && res.status().code() != MetaServiceCode::LOCK_CONFLICT) {
break;
}
uint32_t duration_ms = u(rng);
LOG(WARNING) << "get delete bitmap lock conflict. " << debug_info(req)
<< " retry_times=" << retry_times << " sleep=" << duration_ms
<< "ms : " << res.status().msg();
auto start = std::chrono::steady_clock::now();
bthread_usleep(duration_ms * 1000);
auto end = std::chrono::steady_clock::now();
backoff_sleep_time_ms += duration_cast<std::chrono::milliseconds>(end - start).count();
} while (++retry_times <= config::get_delete_bitmap_lock_max_retry_times);
g_cloud_be_mow_get_dbm_lock_backoff_sleep_time << backoff_sleep_time_ms;
DBUG_EXECUTE_IF("CloudMetaMgr.get_delete_bitmap_update_lock.inject_sleep", {
auto p = dp->param("percent", 0.01);
// 100s > Config.calculate_delete_bitmap_task_timeout_seconds = 60s
auto sleep_time = dp->param("sleep", 15);
std::mt19937 gen {std::random_device {}()};
std::bernoulli_distribution inject_fault {p};
if (inject_fault(gen)) {
LOG_INFO("injection sleep for {} seconds, tablet_id={}", sleep_time,
tablet.tablet_id());
std::this_thread::sleep_for(std::chrono::seconds(sleep_time));
}
});
if (res.status().code() == MetaServiceCode::KV_TXN_CONFLICT_RETRY_EXCEEDED_MAX_TIMES) {
return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR, false>(
"txn conflict when get delete bitmap update lock, table_id {}, lock_id {}, "
"initiator {}",
tablet.table_id(), lock_id, initiator);
} else if (res.status().code() == MetaServiceCode::LOCK_CONFLICT) {
return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR, false>(
"lock conflict when get delete bitmap update lock, table_id {}, lock_id {}, "
"initiator {}",
tablet.table_id(), lock_id, initiator);
}
return st;
}
void CloudMetaMgr::remove_delete_bitmap_update_lock(int64_t table_id, int64_t lock_id,
int64_t initiator, int64_t tablet_id) {
LOG(INFO) << "remove_delete_bitmap_update_lock ,table_id: " << table_id
<< ",lock_id:" << lock_id << ",initiator:" << initiator << ",tablet_id:" << tablet_id;
RemoveDeleteBitmapUpdateLockRequest req;
RemoveDeleteBitmapUpdateLockResponse res;
req.set_cloud_unique_id(config::cloud_unique_id);
req.set_table_id(table_id);
req.set_tablet_id(tablet_id);
req.set_lock_id(lock_id);
req.set_initiator(initiator);
auto st = retry_rpc("remove delete bitmap update lock", req, &res,
&MetaService_Stub::remove_delete_bitmap_update_lock);
if (!st.ok()) {
LOG(WARNING) << "remove delete bitmap update lock fail,table_id=" << table_id
<< ",tablet_id=" << tablet_id << ",lock_id=" << lock_id
<< ",st=" << st.to_string();
}
}
void CloudMetaMgr::check_table_size_correctness(RowsetMeta& rs_meta) {
if (!config::enable_table_size_correctness_check) {
return;
}
int64_t total_segment_size = get_segment_file_size(rs_meta);
int64_t total_inverted_index_size = get_inverted_index_file_size(rs_meta);
if (rs_meta.data_disk_size() != total_segment_size ||
rs_meta.index_disk_size() != total_inverted_index_size ||
rs_meta.data_disk_size() + rs_meta.index_disk_size() != rs_meta.total_disk_size()) {
LOG(WARNING) << "[Cloud table table size check failed]:"
<< " tablet id: " << rs_meta.tablet_id()
<< ", rowset id:" << rs_meta.rowset_id()
<< ", rowset data disk size:" << rs_meta.data_disk_size()
<< ", rowset real data disk size:" << total_segment_size
<< ", rowset index disk size:" << rs_meta.index_disk_size()
<< ", rowset real index disk size:" << total_inverted_index_size
<< ", rowset total disk size:" << rs_meta.total_disk_size()
<< ", rowset segment path:"
<< StorageResource().remote_segment_path(rs_meta.tablet_id(),
rs_meta.rowset_id().to_string(), 0);
DCHECK(false);
}
}
int64_t CloudMetaMgr::get_segment_file_size(RowsetMeta& rs_meta) {
int64_t total_segment_size = 0;
const auto fs = rs_meta.fs();
if (!fs) {
LOG(WARNING) << "get fs failed, resource_id={}" << rs_meta.resource_id();
}
for (int64_t seg_id = 0; seg_id < rs_meta.num_segments(); seg_id++) {
std::string segment_path = StorageResource().remote_segment_path(
rs_meta.tablet_id(), rs_meta.rowset_id().to_string(), seg_id);
int64_t segment_file_size = 0;
auto st = fs->file_size(segment_path, &segment_file_size);
if (!st.ok()) {
segment_file_size = 0;
if (st.is<NOT_FOUND>()) {
LOG(INFO) << "cloud table size correctness check get segment size 0 because "
"file not exist! msg:"
<< st.msg() << ", segment path:" << segment_path;
} else {
LOG(WARNING) << "cloud table size correctness check get segment size failed! msg:"
<< st.msg() << ", segment path:" << segment_path;
}
}
total_segment_size += segment_file_size;
}
return total_segment_size;
}
int64_t CloudMetaMgr::get_inverted_index_file_size(RowsetMeta& rs_meta) {
int64_t total_inverted_index_size = 0;
const auto fs = rs_meta.fs();
if (!fs) {
LOG(WARNING) << "get fs failed, resource_id={}" << rs_meta.resource_id();
}
if (rs_meta.tablet_schema()->get_inverted_index_storage_format() ==
InvertedIndexStorageFormatPB::V1) {
const auto& indices = rs_meta.tablet_schema()->inverted_indexes();
for (auto& index : indices) {
for (int seg_id = 0; seg_id < rs_meta.num_segments(); ++seg_id) {
std::string segment_path = StorageResource().remote_segment_path(
rs_meta.tablet_id(), rs_meta.rowset_id().to_string(), seg_id);
int64_t file_size = 0;
std::string inverted_index_file_path =
InvertedIndexDescriptor::get_index_file_path_v1(
InvertedIndexDescriptor::get_index_file_path_prefix(segment_path),
index->index_id(), index->get_index_suffix());
auto st = fs->file_size(inverted_index_file_path, &file_size);
if (!st.ok()) {
file_size = 0;
if (st.is<NOT_FOUND>()) {
LOG(INFO) << "cloud table size correctness check get inverted index v1 "
"0 because file not exist! msg:"
<< st.msg()
<< ", inverted index path:" << inverted_index_file_path;
} else {
LOG(WARNING)
<< "cloud table size correctness check get inverted index v1 "
"size failed! msg:"
<< st.msg() << ", inverted index path:" << inverted_index_file_path;
}
}
total_inverted_index_size += file_size;
}
}
} else {
for (int seg_id = 0; seg_id < rs_meta.num_segments(); ++seg_id) {
int64_t file_size = 0;
std::string segment_path = StorageResource().remote_segment_path(
rs_meta.tablet_id(), rs_meta.rowset_id().to_string(), seg_id);
std::string inverted_index_file_path = InvertedIndexDescriptor::get_index_file_path_v2(
InvertedIndexDescriptor::get_index_file_path_prefix(segment_path));
auto st = fs->file_size(inverted_index_file_path, &file_size);
if (!st.ok()) {
file_size = 0;
if (st.is<NOT_FOUND>()) {
LOG(INFO) << "cloud table size correctness check get inverted index v2 "
"0 because file not exist! msg:"
<< st.msg() << ", inverted index path:" << inverted_index_file_path;
} else {
LOG(WARNING) << "cloud table size correctness check get inverted index v2 "
"size failed! msg:"
<< st.msg()
<< ", inverted index path:" << inverted_index_file_path;
}
}
total_inverted_index_size += file_size;
}
}
return total_inverted_index_size;
}
Status CloudMetaMgr::fill_version_holes(CloudTablet* tablet, int64_t max_version,
std::unique_lock<std::shared_mutex>& wlock) {
if (max_version <= 0) {
return Status::OK();
}
Versions existing_versions;
for (const auto& [_, rs] : tablet->tablet_meta()->all_rs_metas()) {
existing_versions.emplace_back(rs->version());
}
// If there are no existing versions, it may be a new tablet for restore, so skip filling holes.
if (existing_versions.empty()) {
return Status::OK();
}
std::vector<RowsetSharedPtr> hole_rowsets;
// sort the existing versions in ascending order
std::sort(existing_versions.begin(), existing_versions.end(),
[](const Version& a, const Version& b) {
// simple because 2 versions are certainly not overlapping
return a.first < b.first;
});
// During schema change, get_tablet operations on new tablets trigger sync_tablet_rowsets which calls
// fill_version_holes. For schema change tablets (TABLET_NOTREADY state), we selectively skip hole
// filling for versions <= alter_version to prevent:
// 1. Abnormal compaction score calculations for schema change tablets
// 2. Unexpected -235 errors during load operations
// This allows schema change to proceed normally while still permitting hole filling for versions
// beyond the alter_version threshold.
bool is_schema_change_tablet = tablet->tablet_state() == TABLET_NOTREADY;
if (is_schema_change_tablet && tablet->alter_version() <= 1) {
LOG(INFO) << "Skip version hole filling for new schema change tablet "
<< tablet->tablet_id() << " with alter_version " << tablet->alter_version();
return Status::OK();
}
int64_t last_version = -1;
for (const Version& version : existing_versions) {
VLOG_NOTICE << "Existing version for tablet " << tablet->tablet_id() << ": ["
<< version.first << ", " << version.second << "]";
// missing versions are those that are not in the existing_versions
if (version.first > last_version + 1) {
// there is a hole between versions
auto prev_non_hole_rowset = tablet->get_rowset_by_version(version);
for (int64_t ver = last_version + 1; ver < version.first; ++ver) {
// Skip hole filling for versions <= alter_version during schema change
if (is_schema_change_tablet && ver <= tablet->alter_version()) {
continue;
}
RowsetSharedPtr hole_rowset;
RETURN_IF_ERROR(create_empty_rowset_for_hole(
tablet, ver, prev_non_hole_rowset->rowset_meta(), &hole_rowset));
hole_rowsets.push_back(hole_rowset);
}
LOG(INFO) << "Created empty rowset for version hole, from " << last_version + 1
<< " to " << version.first - 1 << " for tablet " << tablet->tablet_id()
<< (is_schema_change_tablet
? (", schema change tablet skipped filling versions <= " +
std::to_string(tablet->alter_version()))
: "");
}
last_version = version.second;
}
if (last_version + 1 <= max_version) {
LOG(INFO) << "Created empty rowset for version hole, from " << last_version + 1 << " to "
<< max_version << " for tablet " << tablet->tablet_id()
<< (is_schema_change_tablet
? (", schema change tablet skipped filling versions <= " +
std::to_string(tablet->alter_version()))
: "");
// there is a hole after the last existing version
for (; last_version + 1 <= max_version; ++last_version) {
// Skip hole filling for versions <= alter_version during schema change
if (is_schema_change_tablet && last_version + 1 <= tablet->alter_version()) {
continue;
}
RowsetSharedPtr hole_rowset;
auto prev_non_hole_rowset = tablet->get_rowset_by_version(existing_versions.back());
RETURN_IF_ERROR(create_empty_rowset_for_hole(
tablet, last_version + 1, prev_non_hole_rowset->rowset_meta(), &hole_rowset));
hole_rowsets.push_back(hole_rowset);
}
}
if (!hole_rowsets.empty()) {
size_t hole_count = hole_rowsets.size();
tablet->add_rowsets(std::move(hole_rowsets), false, wlock, false);
g_cloud_version_hole_filled_count << hole_count;
}
return Status::OK();
}
Status CloudMetaMgr::create_empty_rowset_for_hole(CloudTablet* tablet, int64_t version,
RowsetMetaSharedPtr prev_rowset_meta,
RowsetSharedPtr* rowset) {
// Create a RowsetMeta for the empty rowset
auto rs_meta = std::make_shared<RowsetMeta>();
// Generate a deterministic rowset ID for the hole (same tablet_id + version = same rowset_id)
RowsetId hole_rowset_id;
hole_rowset_id.init(2, 0, tablet->tablet_id(), version);
rs_meta->set_rowset_id(hole_rowset_id);
// Generate a deterministic load_id for the hole rowset (same tablet_id + version = same load_id)
PUniqueId load_id;
load_id.set_hi(tablet->tablet_id());
load_id.set_lo(version);
rs_meta->set_load_id(load_id);
// Copy schema and other metadata from template
rs_meta->set_tablet_schema(prev_rowset_meta->tablet_schema());
rs_meta->set_rowset_type(prev_rowset_meta->rowset_type());
rs_meta->set_tablet_schema_hash(prev_rowset_meta->tablet_schema_hash());
rs_meta->set_resource_id(prev_rowset_meta->resource_id());
// Basic tablet information
rs_meta->set_tablet_id(tablet->tablet_id());
rs_meta->set_index_id(tablet->index_id());
rs_meta->set_partition_id(tablet->partition_id());
rs_meta->set_tablet_uid(tablet->tablet_uid());
rs_meta->set_version(Version(version, version));
rs_meta->set_txn_id(version);
rs_meta->set_num_rows(0);
rs_meta->set_total_disk_size(0);
rs_meta->set_data_disk_size(0);
rs_meta->set_index_disk_size(0);
rs_meta->set_empty(true);
rs_meta->set_num_segments(0);
rs_meta->set_segments_overlap(NONOVERLAPPING);
rs_meta->set_rowset_state(VISIBLE);
rs_meta->set_creation_time(UnixSeconds());
rs_meta->set_newest_write_timestamp(UnixSeconds());
Status s = RowsetFactory::create_rowset(nullptr, "", rs_meta, rowset);
if (!s.ok()) {
LOG_WARNING("Failed to create empty rowset for hole")
.tag("tablet_id", tablet->tablet_id())
.tag("version", version)
.error(s);
return s;
}
(*rowset)->set_hole_rowset(true);
return Status::OK();
}
Status CloudMetaMgr::list_snapshot(std::vector<SnapshotInfoPB>& snapshots) {
ListSnapshotRequest req;
ListSnapshotResponse res;
req.set_cloud_unique_id(config::cloud_unique_id);
req.set_include_aborted(true);
RETURN_IF_ERROR(retry_rpc("list snapshot", req, &res, &MetaService_Stub::list_snapshot));
for (auto& snapshot : res.snapshots()) {
snapshots.emplace_back(snapshot);
}
return Status::OK();
}
Status CloudMetaMgr::get_snapshot_properties(SnapshotSwitchStatus& switch_status,
int64_t& max_reserved_snapshots,
int64_t& snapshot_interval_seconds) {
GetInstanceRequest req;
GetInstanceResponse res;
req.set_cloud_unique_id(config::cloud_unique_id);
RETURN_IF_ERROR(
retry_rpc("get snapshot properties", req, &res, &MetaService_Stub::get_instance));
switch_status = res.instance().has_snapshot_switch_status()
? res.instance().snapshot_switch_status()
: SnapshotSwitchStatus::SNAPSHOT_SWITCH_DISABLED;
max_reserved_snapshots =
res.instance().has_max_reserved_snapshot() ? res.instance().max_reserved_snapshot() : 0;
snapshot_interval_seconds = res.instance().has_snapshot_interval_seconds()
? res.instance().snapshot_interval_seconds()
: 3600;
return Status::OK();
}
Status CloudMetaMgr::update_packed_file_info(const std::string& packed_file_path,
const cloud::PackedFileInfoPB& packed_file_info) {
VLOG_DEBUG << "Updating meta service for packed file: " << packed_file_path << " with "
<< packed_file_info.total_slice_num() << " small files"
<< ", total bytes: " << packed_file_info.total_slice_bytes();
// Create request
cloud::UpdatePackedFileInfoRequest req;
cloud::UpdatePackedFileInfoResponse resp;
// Set required fields
req.set_cloud_unique_id(config::cloud_unique_id);
req.set_packed_file_path(packed_file_path);
*req.mutable_packed_file_info() = packed_file_info;
// Make RPC call using retry pattern
return retry_rpc("update packed file info", req, &resp,
&cloud::MetaService_Stub::update_packed_file_info);
}
#include "common/compile_check_end.h"
} // namespace doris::cloud