blob: abdc1f9e321da93d5d9263b993297caeed6a0c47 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "http/action/download_binlog_action.h"
#include <fmt/format.h>
#include <fmt/ranges.h>
#include <cstdint>
#include <stdexcept>
#include <string_view>
#include <utility>
#include "common/config.h"
#include "common/logging.h"
#include "http/http_channel.h"
#include "http/http_request.h"
#include "http/utils.h"
#include "io/fs/local_file_system.h"
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
#include "runtime/exec_env.h"
#include "util/stopwatch.hpp"
namespace doris {
namespace {
const std::string kMethodParameter = "method";
const std::string kTokenParameter = "token";
const std::string kTabletIdParameter = "tablet_id";
const std::string kBinlogVersionParameter = "binlog_version";
const std::string kRowsetIdParameter = "rowset_id";
const std::string kSegmentIndexParameter = "segment_index";
const std::string kSegmentIndexIdParameter = "segment_index_id";
const std::string kAcquireMD5Parameter = "acquire_md5";
bvar::LatencyRecorder g_get_binlog_info_latency("doris_download_binlog", "get_binlog_info");
bvar::LatencyRecorder g_get_segment_file_latency("doris_download_binlog", "get_segment_file");
bvar::LatencyRecorder g_get_segment_index_file_latency("doris_download_binlog",
"get_segment_index_file");
bvar::LatencyRecorder g_get_rowset_meta_latency("doris_download_binlog", "get_rowset_meta");
// get http param, if no value throw exception
const auto& get_http_param(HttpRequest* req, const std::string& param_name) {
const auto& param = req->param(param_name);
if (param.empty()) {
auto error_msg = fmt::format("parameter {} not specified in url.", param_name);
throw std::runtime_error(error_msg);
}
return param;
}
auto get_tablet(StorageEngine& engine, const std::string& tablet_id_str) {
int64_t tablet_id = std::atoll(tablet_id_str.data());
TabletSharedPtr tablet = engine.tablet_manager()->get_tablet(tablet_id);
if (tablet == nullptr) {
auto error = fmt::format("tablet is not exist, tablet_id={}", tablet_id);
LOG(WARNING) << error;
throw std::runtime_error(error);
}
return tablet;
}
// need binlog_version, tablet_id
void handle_get_binlog_info(StorageEngine& engine, HttpRequest* req) {
try {
const auto& binlog_version = get_http_param(req, kBinlogVersionParameter);
const auto& tablet_id = get_http_param(req, kTabletIdParameter);
auto tablet = get_tablet(engine, tablet_id);
const auto& [rowset_id, num_segments] = tablet->get_binlog_info(binlog_version);
if (rowset_id.empty()) {
HttpChannel::send_reply(
req, HttpStatus::NOT_FOUND,
fmt::format("get binlog info failed, binlog_version={}", binlog_version));
} else if (num_segments < 0) {
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR,
fmt::format("invalid num_segments: {}", num_segments));
} else {
auto binlog_info_msg = fmt::format("{}:{}", rowset_id, num_segments);
HttpChannel::send_reply(req, binlog_info_msg);
}
} catch (const std::exception& e) {
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, e.what());
LOG(WARNING) << "get binlog info failed, error: " << e.what();
return;
}
}
/// handle get segment file, need tablet_id, rowset_id && index
void handle_get_segment_file(StorageEngine& engine, HttpRequest* req,
bufferevent_rate_limit_group* rate_limit_group) {
// Step 1: get download file path
std::string segment_file_path;
bool is_acquire_md5 = false;
try {
const auto& tablet_id = get_http_param(req, kTabletIdParameter);
auto tablet = get_tablet(engine, tablet_id);
const auto& rowset_id = get_http_param(req, kRowsetIdParameter);
const auto& segment_index = get_http_param(req, kSegmentIndexParameter);
segment_file_path = tablet->get_segment_filepath(rowset_id, segment_index);
is_acquire_md5 = !req->param(kAcquireMD5Parameter).empty();
} catch (const std::exception& e) {
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, e.what());
LOG(WARNING) << "get download file path failed, error: " << e.what();
return;
}
// Step 2: handle download
// check file exists
bool exists = false;
Status status = io::global_local_filesystem()->exists(segment_file_path, &exists);
if (!status.ok()) {
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, status.to_string());
LOG(WARNING) << "check file exists failed, error: " << status.to_string();
return;
}
if (!exists) {
HttpChannel::send_reply(req, HttpStatus::NOT_FOUND, "file not exist.");
LOG(WARNING) << "file not exist, file path: " << segment_file_path;
return;
}
do_file_response(segment_file_path, req, rate_limit_group, is_acquire_md5);
}
/// handle get segment index file, need tablet_id, rowset_id, segment_index && segment_index_id
void handle_get_segment_index_file(StorageEngine& engine, HttpRequest* req,
bufferevent_rate_limit_group* rate_limit_group) {
// Step 1: get download file path
std::string segment_index_file_path;
bool is_acquire_md5 = false;
try {
const auto& tablet_id = get_http_param(req, kTabletIdParameter);
auto tablet = get_tablet(engine, tablet_id);
const auto& rowset_id = get_http_param(req, kRowsetIdParameter);
const auto& segment_index = get_http_param(req, kSegmentIndexParameter);
const auto& segment_index_id = req->param(kSegmentIndexIdParameter);
auto segment_file_path = tablet->get_segment_filepath(rowset_id, segment_index);
if (tablet->tablet_schema()->get_inverted_index_storage_format() ==
InvertedIndexStorageFormatPB::V1) {
// now CCR not support for variant + index v1
constexpr std::string_view index_suffix = "";
segment_index_file_path = InvertedIndexDescriptor::get_index_file_path_v1(
InvertedIndexDescriptor::get_index_file_path_prefix(segment_file_path),
std::stoll(segment_index_id), index_suffix);
} else {
DCHECK(segment_index_id == "-1");
segment_index_file_path = InvertedIndexDescriptor::get_index_file_path_v2(
InvertedIndexDescriptor::get_index_file_path_prefix(segment_file_path));
}
is_acquire_md5 = !req->param(kAcquireMD5Parameter).empty();
} catch (const std::exception& e) {
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, e.what());
LOG(WARNING) << "get download file path failed, error: " << e.what();
return;
}
// Step 2: handle download
// check file exists
bool exists = false;
Status status = io::global_local_filesystem()->exists(segment_index_file_path, &exists);
if (!status.ok()) {
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, status.to_string());
LOG(WARNING) << "check file exists failed, error: " << status.to_string();
return;
}
if (!exists) {
HttpChannel::send_reply(req, HttpStatus::NOT_FOUND, "file not exist.");
LOG(WARNING) << "file not exist, file path: " << segment_index_file_path;
return;
}
do_file_response(segment_index_file_path, req, rate_limit_group, is_acquire_md5);
}
void handle_get_rowset_meta(StorageEngine& engine, HttpRequest* req) {
try {
const auto& tablet_id = get_http_param(req, kTabletIdParameter);
auto tablet = get_tablet(engine, tablet_id);
const auto& rowset_id = get_http_param(req, kRowsetIdParameter);
const auto& binlog_version = get_http_param(req, kBinlogVersionParameter);
auto rowset_meta = tablet->get_rowset_binlog_meta(binlog_version, rowset_id);
if (rowset_meta.empty()) {
HttpChannel::send_reply(req, HttpStatus::NOT_FOUND,
fmt::format("get rowset meta failed, rowset_id={}", rowset_id));
} else {
HttpChannel::send_reply(req, rowset_meta);
}
} catch (const std::exception& e) {
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, e.what());
LOG(WARNING) << "get download file path failed, error: " << e.what();
}
}
} // namespace
DownloadBinlogAction::DownloadBinlogAction(
ExecEnv* exec_env, StorageEngine& engine,
std::shared_ptr<bufferevent_rate_limit_group> rate_limit_group)
: HttpHandlerWithAuth(exec_env),
_engine(engine),
_rate_limit_group(std::move(rate_limit_group)) {}
void DownloadBinlogAction::handle(HttpRequest* req) {
VLOG_CRITICAL << "accept one download binlog request " << req->debug_string();
if (!config::enable_feature_binlog) {
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR,
"binlog feature is not enabled.");
return;
}
// Step 1: check token
Status status;
if (config::enable_token_check) {
// FIXME(Drogon): support check token
// status = _check_token(req);
if (!status.ok()) {
HttpChannel::send_reply(req, HttpStatus::UNAUTHORIZED, status.to_string());
return;
}
}
// Step 2: get method
const std::string& method = req->param(kMethodParameter);
// Step 3: dispatch
MonotonicStopWatch watch;
watch.start();
if (method == "get_binlog_info") {
handle_get_binlog_info(_engine, req);
g_get_binlog_info_latency << watch.elapsed_time_microseconds();
} else if (method == "get_segment_file") {
handle_get_segment_file(_engine, req, _rate_limit_group.get());
g_get_segment_file_latency << watch.elapsed_time_microseconds();
} else if (method == "get_segment_index_file") {
handle_get_segment_index_file(_engine, req, _rate_limit_group.get());
g_get_segment_index_file_latency << watch.elapsed_time_microseconds();
} else if (method == "get_rowset_meta") {
handle_get_rowset_meta(_engine, req);
g_get_rowset_meta_latency << watch.elapsed_time_microseconds();
} else {
auto error_msg = fmt::format("invalid method: {}", method);
LOG(WARNING) << error_msg;
HttpChannel::send_reply(req, HttpStatus::NOT_IMPLEMENTED, error_msg);
}
}
Status DownloadBinlogAction::_check_token(HttpRequest* req) {
const std::string& token_str = req->param(kTokenParameter);
if (token_str.empty()) {
return Status::InternalError("token is not specified.");
}
const std::string& local_token = _exec_env->token();
if (token_str != local_token) {
LOG(WARNING) << "invalid download token: " << token_str << ", local token: " << local_token;
return Status::NotAuthorized("invalid token {}", token_str);
}
return Status::OK();
}
} // end namespace doris