blob: 2e0996997d8ba402425fe6c20bcc9f0b8136823d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "http/action/tablet_migration_action.h"
#include <glog/logging.h>
#include <exception>
#include <string>
#include "common/config.h"
#include "common/status.h"
#include "http/http_channel.h"
#include "http/http_headers.h"
#include "http/http_request.h"
#include "http/http_status.h"
#include "olap/data_dir.h"
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
#include "olap/task/engine_storage_migration_task.h"
namespace doris {
#include "common/compile_check_begin.h"
const static std::string HEADER_JSON = "application/json";
void TabletMigrationAction::_init_migration_action() {
int32_t max_thread_num = config::max_tablet_migration_threads;
int32_t min_thread_num = config::min_tablet_migration_threads;
THROW_IF_ERROR(ThreadPoolBuilder("MigrationTaskThreadPool")
.set_min_threads(min_thread_num)
.set_max_threads(max_thread_num)
.build(&_migration_thread_pool));
}
void TabletMigrationAction::handle(HttpRequest* req) {
int64_t tablet_id = 0;
unsigned long schema_hash = 0;
std::string dest_disk = "";
std::string goal = "";
Status status = _check_param(req, tablet_id, schema_hash, dest_disk, goal);
if (status.ok()) {
if (goal == "run") {
MigrationTask current_task(tablet_id, schema_hash, dest_disk);
TabletSharedPtr tablet;
DataDir* dest_store;
status = _check_migrate_request(tablet_id, schema_hash, dest_disk, tablet, &dest_store);
if (status.ok()) {
do {
{
std::unique_lock<std::mutex> lock(_migration_status_mutex);
std::map<MigrationTask, std::string>::iterator it_task =
_migration_tasks.find(current_task);
if (it_task != _migration_tasks.end()) {
status = Status::AlreadyExist(
"There is a migration task for this tablet already exists. "
"dest_disk is {} .",
(it_task->first)._dest_disk);
break;
}
_migration_tasks[current_task] = "submitted";
}
auto st = _migration_thread_pool->submit_func([&, tablet, dest_store,
current_task]() {
{
std::unique_lock<std::mutex> lock(_migration_status_mutex);
_migration_tasks[current_task] = "running";
}
Status result_status = _execute_tablet_migration(tablet, dest_store);
{
std::unique_lock<std::mutex> lock(_migration_status_mutex);
std::map<MigrationTask, std::string>::iterator it_task =
_migration_tasks.find(current_task);
if (it_task != _migration_tasks.end()) {
_migration_tasks.erase(it_task);
}
std::pair<MigrationTask, Status> finished_task =
std::make_pair(current_task, result_status);
if (_finished_migration_tasks.size() >=
config::finished_migration_tasks_size) {
_finished_migration_tasks.pop_front();
}
_finished_migration_tasks.push_back(finished_task);
}
});
if (!st.ok()) {
status = Status::InternalError("Migration task submission failed");
std::unique_lock<std::mutex> lock(_migration_status_mutex);
std::map<MigrationTask, std::string>::iterator it_task =
_migration_tasks.find(current_task);
if (it_task != _migration_tasks.end()) {
_migration_tasks.erase(it_task);
}
}
} while (0);
}
std::string status_result;
if (!status.ok()) {
status_result = status.to_json();
} else {
status_result =
"{\"status\": \"Success\", \"msg\": \"migration task is successfully "
"submitted.\"}";
}
req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.c_str());
HttpChannel::send_reply(req, HttpStatus::OK, status_result);
} else {
DCHECK(goal == "status");
MigrationTask current_task(tablet_id, schema_hash);
std::string status_result;
do {
std::unique_lock<std::mutex> lock(_migration_status_mutex);
std::map<MigrationTask, std::string>::iterator it_task =
_migration_tasks.find(current_task);
if (it_task != _migration_tasks.end()) {
status_result = "{\"status\": \"Success\", \"msg\": \"migration task is " +
it_task->second + "\", \"dest_disk\": \"" +
(it_task->first)._dest_disk + "\"}";
break;
}
int i = cast_set<int>(_finished_migration_tasks.size()) - 1;
for (; i >= 0; i--) {
MigrationTask finished_task = _finished_migration_tasks[i].first;
if (finished_task._tablet_id == tablet_id &&
finished_task._schema_hash == schema_hash) {
status = _finished_migration_tasks[i].second;
if (status.ok()) {
status_result =
"{\"status\": \"Success\", \"msg\": \"migration task has "
"finished successfully\", \"dest_disk\": \"" +
finished_task._dest_disk + "\"}";
}
break;
}
}
if (i < 0) {
status = Status::NotFound("Migration task not found");
}
} while (0);
if (!status.ok()) {
status_result = status.to_json();
}
req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.c_str());
HttpChannel::send_reply(req, HttpStatus::OK, status_result);
}
} else {
std::string status_result = status.to_json();
req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.c_str());
HttpChannel::send_reply(req, HttpStatus::OK, status_result);
}
}
Status TabletMigrationAction::_check_param(HttpRequest* req, int64_t& tablet_id,
unsigned long& schema_hash, std::string& dest_disk,
std::string& goal) {
const std::string& req_tablet_id = req->param("tablet_id");
const std::string& req_schema_hash = req->param("schema_hash");
try {
tablet_id = std::stoull(req_tablet_id);
schema_hash = std::stoul(req_schema_hash);
} catch (const std::exception& e) {
return Status::InternalError(
"Convert failed:{}, invalid argument.tablet_id: {}, schema_hash: {}", e.what(),
req_tablet_id, req_schema_hash);
}
dest_disk = req->param("disk");
goal = req->param("goal");
if (goal != "run" && goal != "status") {
return Status::InternalError("invalid goal argument.");
}
return Status::OK();
}
Status TabletMigrationAction::_check_migrate_request(int64_t tablet_id, unsigned long schema_hash,
std::string dest_disk, TabletSharedPtr& tablet,
DataDir** dest_store) {
tablet = _engine.tablet_manager()->get_tablet(tablet_id);
if (tablet == nullptr) {
LOG(WARNING) << "no tablet for tablet_id:" << tablet_id;
return Status::NotFound("Tablet not found");
}
// request specify the data dir
*dest_store = _engine.get_store(dest_disk);
if (*dest_store == nullptr) {
LOG(WARNING) << "data dir not found: " << dest_disk;
return Status::NotFound("Disk not found");
}
if (tablet->data_dir() == *dest_store) {
LOG(WARNING) << "tablet already exist in destine disk: " << dest_disk;
return Status::AlreadyExist("Tablet already exist in destination disk");
}
// check local disk capacity
int64_t tablet_size = tablet->tablet_local_size();
if ((*dest_store)->reach_capacity_limit(tablet_size)) {
LOG(WARNING) << "reach the capacity limit of path: " << (*dest_store)->path()
<< ", tablet size: " << tablet_size;
return Status::Error<ErrorCode::EXCEEDED_LIMIT>(
"reach the capacity limit of path {}, tablet_size={}", (*dest_store)->path(),
tablet_size);
}
return Status::OK();
}
Status TabletMigrationAction::_execute_tablet_migration(TabletSharedPtr tablet,
DataDir* dest_store) {
int64_t tablet_id = tablet->tablet_id();
int32_t schema_hash = tablet->schema_hash();
std::string dest_disk = dest_store->path();
EngineStorageMigrationTask engine_task(_engine, tablet, dest_store);
SCOPED_ATTACH_TASK(engine_task.mem_tracker());
Status res = engine_task.execute();
if (!res.ok()) {
LOG(WARNING) << "tablet migrate failed. tablet_id=" << tablet_id
<< ", schema_hash=" << schema_hash << ", dest_disk=" << dest_disk
<< ", status:" << res;
} else {
LOG(INFO) << "tablet migrate success. tablet_id=" << tablet_id
<< ", schema_hash=" << schema_hash << ", dest_disk=" << dest_disk;
}
return res;
}
#include "common/compile_check_end.h"
} // namespace doris