blob: bb815ce859a24e363f3b512d851e6d38ca4fd981 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <brpc/controller.h>
#include <bthread/bthread.h>
#include <bthread/countdown_event.h>
#include <bthread/mutex.h>
#include <bthread/types.h>
#include <bvar/latency_recorder.h>
#include <gen_cpp/FrontendService_types.h>
#include <gen_cpp/HeartbeatService_types.h>
#include <gen_cpp/Types_types.h>
#include <gen_cpp/internal_service.pb.h>
#include <gen_cpp/olap_file.pb.h>
#include <glog/logging.h>
#include <cstdint>
#include <memory>
#include <mutex>
#include <optional>
#include <ranges>
#include <sstream>
#include <utility>
#include "cloud/config.h"
#include "common/status.h"
#include "cpp/sync_point.h"
#include "olap/base_tablet.h"
#include "olap/olap_common.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_factory.h"
#include "olap/rowset/rowset_reader.h"
#include "runtime/client_cache.h"
#include "service/backend_options.h"
#include "service/internal_service.h"
#include "util/brpc_client_cache.h"
#include "util/debug_points.h"
#include "util/thrift_rpc_helper.h"
#include "util/time.h"
namespace doris {
using namespace ErrorCode;
using namespace std::ranges;
static bvar::LatencyRecorder g_remote_fetch_tablet_rowsets_single_request_latency(
"remote_fetch_rowsets_single_rpc");
static bvar::LatencyRecorder g_remote_fetch_tablet_rowsets_latency("remote_fetch_rowsets");
[[nodiscard]] Result<std::vector<Version>> BaseTablet::capture_consistent_versions_unlocked(
const Version& version_range, const CaptureRowsetOps& options) const {
std::vector<Version> version_path;
auto st =
_timestamped_version_tracker.capture_consistent_versions(version_range, &version_path);
if (!st && !options.quiet) {
auto missed_versions = get_missed_versions_unlocked(version_range.second);
if (missed_versions.empty()) {
LOG(WARNING) << fmt::format(
"version already has been merged. version_range={}, max_version={}, "
"tablet_id={}",
version_range.to_string(), _tablet_meta->max_version().second, tablet_id());
return ResultError(Status::Error<VERSION_ALREADY_MERGED>(
"missed versions is empty, version_range={}, max_version={}, tablet_id={}",
version_range.to_string(), _tablet_meta->max_version().second, tablet_id()));
}
LOG(WARNING) << fmt::format("missed version for version_range={}, tablet_id={}, st={}",
version_range.to_string(), tablet_id(), st);
_print_missed_versions(missed_versions);
if (!options.skip_missing_versions) {
return ResultError(std::move(st));
}
LOG(WARNING) << "force skipping missing version for tablet:" << tablet_id();
}
DBUG_EXECUTE_IF("Tablet::capture_consistent_versions.inject_failure", {
auto tablet_id = dp->param<int64_t>("tablet_id", -1);
auto skip_by_option = dp->param<bool>("skip_by_option", false);
if (skip_by_option && !options.enable_fetch_rowsets_from_peers) {
return version_path;
}
if ((tablet_id != -1 && (tablet_id == _tablet_meta->tablet_id())) || tablet_id == -2) {
return ResultError(Status::Error<VERSION_ALREADY_MERGED>("version already merged"));
}
});
return version_path;
}
[[nodiscard]] Result<CaptureRowsetResult> BaseTablet::capture_consistent_rowsets_unlocked(
const Version& version_range, const CaptureRowsetOps& options) const {
CaptureRowsetResult result;
auto& rowsets = result.rowsets;
auto maybe_versions = capture_consistent_versions_unlocked(version_range, options);
if (maybe_versions) {
const auto& version_paths = maybe_versions.value();
rowsets.reserve(version_paths.size());
auto rowset_for_version = [&](const Version& version,
bool include_stale) -> Result<RowsetSharedPtr> {
if (auto it = _rs_version_map.find(version); it != _rs_version_map.end()) {
return it->second;
} else {
VLOG_NOTICE << "fail to find Rowset in rs_version for version. tablet="
<< tablet_id() << ", version='" << version.first << "-"
<< version.second;
}
if (include_stale) {
if (auto it = _stale_rs_version_map.find(version);
it != _stale_rs_version_map.end()) {
return it->second;
} else {
LOG(WARNING) << fmt::format(
"fail to find Rowset in stale_rs_version for version. tablet={}, "
"version={}-{}",
tablet_id(), version.first, version.second);
}
}
return ResultError(Status::Error<CAPTURE_ROWSET_ERROR>(
"failed to find rowset for version={}", version.to_string()));
};
for (const auto& version : version_paths) {
auto ret = rowset_for_version(version, options.include_stale_rowsets);
if (!ret) {
return ResultError(std::move(ret.error()));
}
rowsets.push_back(std::move(ret.value()));
}
if (keys_type() == KeysType::UNIQUE_KEYS && enable_unique_key_merge_on_write()) {
result.delete_bitmap = _tablet_meta->delete_bitmap_ptr();
}
return result;
}
if (!config::is_cloud_mode() || !options.enable_fetch_rowsets_from_peers) {
return ResultError(std::move(maybe_versions.error()));
}
auto ret = _remote_capture_rowsets(version_range);
if (!ret) {
auto st = Status::Error<VERSION_ALREADY_MERGED>(
"version already merged, meet error during remote capturing rowsets, "
"error={}, version_range={}",
ret.error().to_string(), version_range.to_string());
return ResultError(std::move(st));
}
return ret;
}
[[nodiscard]] Result<std::vector<RowSetSplits>> BaseTablet::capture_rs_readers_unlocked(
const Version& version_range, const CaptureRowsetOps& options) const {
auto maybe_rs_list = capture_consistent_rowsets_unlocked(version_range, options);
if (!maybe_rs_list) {
return ResultError(std::move(maybe_rs_list.error()));
}
const auto& rs_list = maybe_rs_list.value().rowsets;
std::vector<RowSetSplits> rs_splits;
rs_splits.reserve(rs_list.size());
for (const auto& rs : rs_list) {
RowsetReaderSharedPtr rs_reader;
auto st = rs->create_reader(&rs_reader);
if (!st) {
return ResultError(Status::Error<CAPTURE_ROWSET_READER_ERROR>(
"failed to create reader for rowset={}, reason={}", rs->rowset_id().to_string(),
st.to_string()));
}
rs_splits.emplace_back(std::move(rs_reader));
}
return rs_splits;
}
[[nodiscard]] Result<TabletReadSource> BaseTablet::capture_read_source(
const Version& version_range, const CaptureRowsetOps& options) {
std::shared_lock rdlock(get_header_lock());
auto maybe_result = capture_consistent_rowsets_unlocked(version_range, options);
if (!maybe_result) {
return ResultError(std::move(maybe_result.error()));
}
auto rowsets_result = std::move(maybe_result.value());
TabletReadSource read_source;
read_source.delete_bitmap = std::move(rowsets_result.delete_bitmap);
const auto& rowsets = rowsets_result.rowsets;
read_source.rs_splits.reserve(rowsets.size());
for (const auto& rs : rowsets) {
RowsetReaderSharedPtr rs_reader;
auto st = rs->create_reader(&rs_reader);
if (!st) {
return ResultError(Status::Error<CAPTURE_ROWSET_READER_ERROR>(
"failed to create reader for rowset={}, reason={}", rs->rowset_id().to_string(),
st.to_string()));
}
read_source.rs_splits.emplace_back(std::move(rs_reader));
}
return read_source;
}
template <typename Fn, typename... Args>
bool call_bthread(bthread_t& th, const bthread_attr_t* attr, Fn&& fn, Args&&... args) {
auto p_wrap_fn = new auto([=] { fn(args...); });
auto call_back = [](void* ar) -> void* {
auto f = reinterpret_cast<decltype(p_wrap_fn)>(ar);
(*f)();
delete f;
return nullptr;
};
return bthread_start_background(&th, attr, call_back, p_wrap_fn) == 0;
}
struct GetRowsetsCntl : std::enable_shared_from_this<GetRowsetsCntl> {
struct RemoteGetRowsetResult {
std::vector<RowsetMetaSharedPtr> rowsets;
std::unique_ptr<DeleteBitmap> delete_bitmap;
};
Status start_req_bg() {
task_cnt = req_addrs.size();
for (const auto& [ip, port] : req_addrs) {
bthread_t tid;
bthread_attr_t attr = BTHREAD_ATTR_NORMAL;
bool succ = call_bthread(tid, &attr, [self = shared_from_this(), &ip, port]() {
LOG(INFO) << "start to get tablet rowsets from peer BE, ip=" << ip;
Defer defer_log {[&ip, port]() {
LOG(INFO) << "finish to get rowsets from peer BE, ip=" << ip
<< ", port=" << port;
}};
PGetTabletRowsetsRequest req;
req.set_tablet_id(self->tablet_id);
req.set_version_start(self->version_range.first);
req.set_version_end(self->version_range.second);
if (self->delete_bitmap_keys.has_value()) {
req.mutable_delete_bitmap_keys()->CopyFrom(self->delete_bitmap_keys.value());
}
brpc::Controller cntl;
cntl.set_timeout_ms(60000);
cntl.set_max_retry(3);
PGetTabletRowsetsResponse response;
auto start_tm_us = MonotonicMicros();
#ifndef BE_TEST
std::shared_ptr<PBackendService_Stub> stub =
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(ip, port);
if (stub == nullptr) {
self->result = ResultError(Status::InternalError(
"failed to fetch get_tablet_rowsets stub, ip={}, port={}", ip, port));
return;
}
stub->get_tablet_rowsets(&cntl, &req, &response, nullptr);
#else
TEST_SYNC_POINT_CALLBACK("get_tablet_rowsets", &response);
#endif
g_remote_fetch_tablet_rowsets_single_request_latency
<< MonotonicMicros() - start_tm_us;
std::unique_lock l(self->butex);
if (self->done) {
return;
}
--self->task_cnt;
auto resp_st = Status::create(response.status());
DBUG_EXECUTE_IF("GetRowsetCntl::start_req_bg.inject_failure",
{ resp_st = Status::InternalError("inject error"); });
if (cntl.Failed() || !resp_st) {
if (self->task_cnt != 0) {
return;
}
std::stringstream reason;
reason << "failed to get rowsets from all replicas, tablet_id="
<< self->tablet_id;
if (cntl.Failed()) {
reason << ", reason=[" << cntl.ErrorCode() << "] " << cntl.ErrorText();
} else {
reason << ", reason=" << resp_st.to_string();
}
self->result = ResultError(Status::InternalError(reason.str()));
self->done = true;
self->event.signal();
return;
}
Defer done_cb {[&]() {
self->done = true;
self->event.signal();
}};
std::vector<RowsetMetaSharedPtr> rs_metas;
for (auto&& rs_pb : response.rowsets()) {
auto rs_meta = std::make_shared<RowsetMeta>();
if (!rs_meta->init_from_pb(rs_pb)) {
self->result =
ResultError(Status::InternalError("failed to init rowset from pb"));
return;
}
rs_metas.push_back(std::move(rs_meta));
}
CaptureRowsetResult result;
self->result->rowsets = std::move(rs_metas);
if (response.has_delete_bitmap()) {
self->result->delete_bitmap = std::make_unique<DeleteBitmap>(
DeleteBitmap::from_pb(response.delete_bitmap(), self->tablet_id));
}
});
if (!succ) {
return Status::InternalError(
"failed to create bthread when request rowsets for tablet={}", tablet_id);
}
}
return Status::OK();
}
Result<RemoteGetRowsetResult> wait_for_ret() {
event.wait();
return std::move(result);
}
int64_t tablet_id;
std::vector<std::pair<std::string, int32_t>> req_addrs;
Version version_range;
std::optional<DeleteBitmapPB> delete_bitmap_keys = std::nullopt;
private:
size_t task_cnt;
bthread::Mutex butex;
bthread::CountdownEvent event {1};
bool done = false;
Result<RemoteGetRowsetResult> result;
};
Result<std::vector<std::pair<std::string, int32_t>>> get_peer_replicas_addresses(
const int64_t tablet_id) {
auto* cluster_info = ExecEnv::GetInstance()->cluster_info();
DCHECK_NE(cluster_info, nullptr);
auto master_addr = cluster_info->master_fe_addr;
TGetTabletReplicaInfosRequest req;
req.tablet_ids.push_back(tablet_id);
TGetTabletReplicaInfosResult resp;
auto st = ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&](FrontendServiceConnection& client) { client->getTabletReplicaInfos(resp, req); });
if (!st) {
return ResultError(Status::InternalError(
"failed to get tablet replica infos, rpc error={}, tablet_id={}", st.to_string(),
tablet_id));
}
auto it = resp.tablet_replica_infos.find(tablet_id);
if (it == resp.tablet_replica_infos.end()) {
return ResultError(Status::InternalError("replicas not found, tablet_id={}", tablet_id));
}
auto replicas = it->second;
auto local_host = BackendOptions::get_localhost();
bool include_local_host = false;
DBUG_EXECUTE_IF("get_peer_replicas_address.enable_local_host", { include_local_host = true; });
auto ret_view =
replicas | std::views::filter([&local_host, include_local_host](const auto& replica) {
return local_host.find(replica.host) == std::string::npos || include_local_host;
}) |
std::views::transform([](auto& replica) {
return std::make_pair(std::move(replica.host), replica.brpc_port);
});
return std::vector(ret_view.begin(), ret_view.end());
}
Result<CaptureRowsetResult> BaseTablet::_remote_capture_rowsets(
const Version& version_range) const {
auto start_tm_us = MonotonicMicros();
Defer defer {
[&]() { g_remote_fetch_tablet_rowsets_latency << MonotonicMicros() - start_tm_us; }};
#ifndef BE_TEST
auto maybe_be_addresses = get_peer_replicas_addresses(tablet_id());
#else
Result<std::vector<std::pair<std::string, int32_t>>> maybe_be_addresses;
TEST_SYNC_POINT_CALLBACK("get_peer_replicas_addresses", &maybe_be_addresses);
#endif
DBUG_EXECUTE_IF("Tablet::_remote_get_rowsets_meta.inject_replica_address_fail",
{ maybe_be_addresses = ResultError(Status::InternalError("inject failure")); });
if (!maybe_be_addresses) {
return ResultError(std::move(maybe_be_addresses.error()));
}
auto be_addresses = std::move(maybe_be_addresses.value());
if (be_addresses.empty()) {
LOG(WARNING) << "no peers replica for tablet=" << tablet_id();
return ResultError(Status::InternalError("no replicas for tablet={}", tablet_id()));
}
auto cntl = std::make_shared<GetRowsetsCntl>();
cntl->tablet_id = tablet_id();
cntl->req_addrs = std::move(be_addresses);
cntl->version_range = version_range;
bool is_mow = keys_type() == KeysType::UNIQUE_KEYS && enable_unique_key_merge_on_write();
CaptureRowsetResult result;
if (is_mow) {
result.delete_bitmap =
std::make_unique<DeleteBitmap>(_tablet_meta->delete_bitmap().snapshot());
DeleteBitmapPB delete_bitmap_keys;
auto keyset = result.delete_bitmap->delete_bitmap |
std::views::transform([](const auto& kv) { return kv.first; });
for (const auto& key : keyset) {
const auto& [rs_id, seg_id, version] = key;
delete_bitmap_keys.mutable_rowset_ids()->Add(rs_id.to_string());
delete_bitmap_keys.mutable_segment_ids()->Add(seg_id);
delete_bitmap_keys.mutable_versions()->Add(version);
}
cntl->delete_bitmap_keys = std::move(delete_bitmap_keys);
}
RETURN_IF_ERROR_RESULT(cntl->start_req_bg());
auto maybe_meta = cntl->wait_for_ret();
if (!maybe_meta) {
auto err = Status::InternalError(
"tried to get rowsets from peer replicas and failed, "
"reason={}",
maybe_meta.error());
return ResultError(std::move(err));
}
auto& remote_meta = maybe_meta.value();
const auto& rs_metas = remote_meta.rowsets;
for (const auto& rs_meta : rs_metas) {
RowsetSharedPtr rs;
auto st = RowsetFactory::create_rowset(_tablet_meta->tablet_schema(), {}, rs_meta, &rs);
if (!st) {
return ResultError(std::move(st));
}
result.rowsets.push_back(std::move(rs));
}
if (is_mow) {
DCHECK_NE(result.delete_bitmap, nullptr);
result.delete_bitmap->merge(*remote_meta.delete_bitmap);
}
return result;
}
} // namespace doris