blob: 67999c2d09ab2b6824db909320af8dc325143f29 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "http/action/download_action.h"
#include <memory>
#include <string>
#include <utility>
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "http/http_channel.h"
#include "http/http_request.h"
#include "http/utils.h"
#include "io/fs/local_file_system.h"
#include "runtime/exec_env.h"
namespace doris {
namespace {
const std::string FILE_PARAMETER = "file";
const std::string TOKEN_PARAMETER = "token";
const std::string CHANNEL_PARAMETER = "channel";
const std::string CHANNEL_INGEST_BINLOG_TYPE = "ingest_binlog";
const std::string ACQUIRE_MD5_PARAMETER = "acquire_md5";
} // namespace
DownloadAction::DownloadAction(ExecEnv* exec_env,
std::shared_ptr<bufferevent_rate_limit_group> rate_limit_group,
const std::vector<std::string>& allow_dirs, int32_t num_workers)
: HttpHandlerWithAuth(exec_env),
_download_type(NORMAL),
_num_workers(num_workers),
_rate_limit_group(std::move(rate_limit_group)) {
for (const auto& dir : allow_dirs) {
std::string p;
Status st = io::global_local_filesystem()->canonicalize(dir, &p);
if (!st.ok()) {
continue;
}
_allow_paths.emplace_back(std::move(p));
}
if (_num_workers > 0) {
// for single-replica-load
static_cast<void>(ThreadPoolBuilder("DownloadThreadPool")
.set_min_threads(num_workers)
.set_max_threads(num_workers)
.build(&_download_workers));
}
}
DownloadAction::DownloadAction(ExecEnv* exec_env, const std::string& error_log_root_dir)
: HttpHandlerWithAuth(exec_env), _download_type(ERROR_LOG), _num_workers(0) {
#ifndef BE_TEST
static_cast<void>(
io::global_local_filesystem()->canonicalize(error_log_root_dir, &_error_log_root_dir));
#endif
}
void DownloadAction::handle_normal(HttpRequest* req, const std::string& file_param) {
// check token
Status status;
if (config::enable_token_check) {
status = check_token(req);
if (!status.ok()) {
std::string error_msg = status.to_string();
if (status.is<ErrorCode::NOT_AUTHORIZED>()) {
HttpChannel::send_reply(req, HttpStatus::UNAUTHORIZED, error_msg);
return;
} else {
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, error_msg);
return;
}
}
}
status = check_path_is_allowed(file_param);
if (!status.ok()) {
std::string error_msg = status.to_string();
if (status.is<ErrorCode::NOT_FOUND>() || status.is<ErrorCode::IO_ERROR>()) {
HttpChannel::send_reply(req, HttpStatus::NOT_FOUND, error_msg);
return;
} else if (status.is<ErrorCode::NOT_AUTHORIZED>()) {
HttpChannel::send_reply(req, HttpStatus::UNAUTHORIZED, error_msg);
return;
} else {
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, error_msg);
return;
}
}
bool is_dir = false;
status = io::global_local_filesystem()->is_directory(file_param, &is_dir);
if (!status.ok()) {
HttpChannel::send_reply(req, status.to_string());
return;
}
if (is_dir) {
do_dir_response(file_param, req);
} else {
const auto& channel = req->param(CHANNEL_PARAMETER);
bool ingest_binlog = (channel == CHANNEL_INGEST_BINLOG_TYPE);
bool is_acquire_md5 = !req->param(ACQUIRE_MD5_PARAMETER).empty();
auto* rate_limit_group = ingest_binlog ? _rate_limit_group.get() : nullptr;
do_file_response(file_param, req, rate_limit_group, is_acquire_md5);
}
}
void DownloadAction::handle_error_log(HttpRequest* req, const std::string& file_param) {
const std::string absolute_path = _error_log_root_dir + "/" + file_param;
Status status = check_log_path_is_allowed(absolute_path);
if (!status.ok()) {
std::string error_msg = status.to_string();
if (status.is<ErrorCode::NOT_AUTHORIZED>()) {
HttpChannel::send_reply(req, HttpStatus::UNAUTHORIZED, error_msg);
return;
} else {
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, error_msg);
return;
}
}
bool is_dir = false;
status = io::global_local_filesystem()->is_directory(absolute_path, &is_dir);
if (!status.ok()) {
std::string error_msg = status.to_string();
if (status.is<ErrorCode::NOT_FOUND>() || status.is<ErrorCode::IO_ERROR>()) {
HttpChannel::send_reply(req, HttpStatus::NOT_FOUND, error_msg);
return;
} else if (status.is<ErrorCode::NOT_AUTHORIZED>()) {
HttpChannel::send_reply(req, HttpStatus::UNAUTHORIZED, error_msg);
return;
} else {
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, error_msg);
return;
}
}
if (is_dir) {
std::string error_msg = "error log can only be file.";
HttpChannel::send_reply(req, error_msg);
return;
}
do_file_response(absolute_path, req);
}
void DownloadAction::handle(HttpRequest* req) {
if (_num_workers > 0) {
// async for heavy download job, currently mainly for single-replica-load
auto status = _download_workers->submit_func([this, req]() { _handle(req); });
if (!status.ok()) {
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, status.to_string());
}
} else {
_handle(req);
}
}
void DownloadAction::_handle(HttpRequest* req) {
VLOG_CRITICAL << "accept one download request " << req->debug_string();
// Get 'file' parameter, then assembly file absolute path
const std::string& file_path = req->param(FILE_PARAMETER);
if (file_path.empty()) {
std::string error_msg =
std::string("parameter " + FILE_PARAMETER + " not specified in url.");
HttpChannel::send_reply(req, error_msg);
return;
}
if (_download_type == ERROR_LOG) {
handle_error_log(req, file_path);
} else if (_download_type == NORMAL) {
handle_normal(req, file_path);
}
VLOG_CRITICAL << "deal with download request finished! ";
}
Status DownloadAction::check_token(HttpRequest* req) {
const std::string& token_str = req->param(TOKEN_PARAMETER);
if (token_str.empty()) {
return Status::NotAuthorized("token is not specified.");
}
const std::string& local_token = _exec_env->token();
if (token_str != local_token) {
LOG(WARNING) << "invalid download token: " << token_str << ", local token: " << local_token;
return Status::NotAuthorized("invalid token {}", token_str);
}
return Status::OK();
}
Status DownloadAction::check_path_is_allowed(const std::string& file_path) {
DCHECK_EQ(_download_type, NORMAL);
std::string canonical_file_path;
RETURN_IF_ERROR(io::global_local_filesystem()->canonicalize(file_path, &canonical_file_path));
for (auto& allow_path : _allow_paths) {
if (io::LocalFileSystem::contain_path(allow_path, canonical_file_path)) {
return Status::OK();
}
}
return Status::NotAuthorized("file path is not allowed: {}", canonical_file_path);
}
Status DownloadAction::check_log_path_is_allowed(const std::string& file_path) {
DCHECK_EQ(_download_type, ERROR_LOG);
std::string canonical_file_path;
RETURN_IF_ERROR(io::global_local_filesystem()->canonicalize(file_path, &canonical_file_path));
if (io::LocalFileSystem::contain_path(_error_log_root_dir, canonical_file_path)) {
return Status::OK();
}
return Status::NotAuthorized("file path is not allowed: {}", file_path);
}
} // end namespace doris