blob: 88791eb5e32aa23b5abc9aab79664d13592430d9 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <chrono>
#include <cstdint>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "backup_types.h"
#include "block_service/block_service.h"
#include "block_service/block_service_manager.h"
#include "common/backup_common.h"
#include "common/gpid.h"
#include "common/json_helper.h"
#include "common/replication.codes.h"
#include "dsn.layer2_types.h"
#include "meta/backup_engine.h"
#include "meta/meta_backup_service.h"
#include "meta/meta_data.h"
#include "meta/meta_service.h"
#include "runtime/api_layer1.h"
#include "runtime/rpc/rpc_address.h"
#include "runtime/rpc/rpc_holder.h"
#include "runtime/task/async_calls.h"
#include "runtime/task/task.h"
#include "runtime/task/task_code.h"
#include "runtime/task/task_tracker.h"
#include "server_state.h"
#include "utils/autoref_ptr.h"
#include "utils/blob.h"
#include "utils/chrono_literals.h"
#include "utils/error_code.h"
#include "utils/filesystem.h"
#include "utils/fmt_logging.h"
#include "utils/zlocks.h"
namespace dsn {
namespace replication {
backup_engine::backup_engine(backup_service *service)
: _backup_service(service), _block_service(nullptr), _backup_path(""), _is_backup_failed(false)
{
}
backup_engine::~backup_engine() { _tracker.cancel_outstanding_tasks(); }
error_code backup_engine::init_backup(int32_t app_id)
{
std::string app_name;
int partition_count;
{
zauto_read_lock l;
_backup_service->get_state()->lock_read(l);
std::shared_ptr<app_state> app = _backup_service->get_state()->get_app(app_id);
if (app == nullptr || app->status != app_status::AS_AVAILABLE) {
LOG_ERROR("app {} is not available, couldn't do backup now.", app_id);
return ERR_INVALID_STATE;
}
app_name = app->app_name;
partition_count = app->partition_count;
}
zauto_lock lock(_lock);
_backup_status.clear();
for (int i = 0; i < partition_count; ++i) {
_backup_status.emplace(i, backup_status::UNALIVE);
}
_cur_backup.app_id = app_id;
_cur_backup.app_name = app_name;
_cur_backup.backup_id = static_cast<int64_t>(dsn_now_ms());
_cur_backup.start_time_ms = _cur_backup.backup_id;
return ERR_OK;
}
error_code backup_engine::set_block_service(const std::string &provider)
{
_provider_type = provider;
_block_service = _backup_service->get_meta_service()
->get_block_service_manager()
.get_or_create_block_filesystem(provider);
if (_block_service == nullptr) {
return ERR_INVALID_PARAMETERS;
}
return ERR_OK;
}
error_code backup_engine::set_backup_path(const std::string &path)
{
if (_block_service && _block_service->is_root_path_set()) {
return ERR_INVALID_PARAMETERS;
}
LOG_INFO("backup path is set to {}.", path);
_backup_path = path;
return ERR_OK;
}
error_code backup_engine::write_backup_file(const std::string &file_name,
const dsn::blob &write_buffer)
{
dist::block_service::create_file_request create_file_req;
create_file_req.ignore_metadata = true;
create_file_req.file_name = file_name;
dsn::error_code err;
dist::block_service::block_file_ptr remote_file;
_block_service
->create_file(create_file_req,
TASK_CODE_EXEC_INLINED,
[&err, &remote_file](const dist::block_service::create_file_response &resp) {
err = resp.err;
remote_file = resp.file_handle;
})
->wait();
if (err != dsn::ERR_OK) {
LOG_INFO("create file {} failed", file_name);
return err;
}
CHECK_NOTNULL(
remote_file, "create file {} succeed, but can't get handle", create_file_req.file_name);
remote_file
->write(dist::block_service::write_request{write_buffer},
TASK_CODE_EXEC_INLINED,
[&err](const dist::block_service::write_response &resp) { err = resp.err; })
->wait();
return err;
}
error_code backup_engine::backup_app_meta()
{
dsn::blob app_info_buffer;
{
zauto_read_lock l;
_backup_service->get_state()->lock_read(l);
std::shared_ptr<app_state> app = _backup_service->get_state()->get_app(_cur_backup.app_id);
if (app == nullptr || app->status != app_status::AS_AVAILABLE) {
LOG_ERROR("app {} is not available, couldn't do backup now.", _cur_backup.app_id);
return ERR_INVALID_STATE;
}
app_state tmp = *app;
// Because we don't restore app envs, so no need to write app envs to backup file.
// TODO(zhangyifan): backup and restore app envs when needed.
tmp.envs.clear();
app_info_buffer = dsn::json::json_forwarder<app_info>::encode(tmp);
}
std::string backup_root =
dsn::utils::filesystem::path_combine(_backup_path, _backup_service->backup_root());
std::string file_name = cold_backup::get_app_metadata_file(
backup_root, _cur_backup.app_name, _cur_backup.app_id, _cur_backup.backup_id);
return write_backup_file(file_name, app_info_buffer);
}
void backup_engine::backup_app_partition(const gpid &pid)
{
dsn::rpc_address partition_primary;
{
zauto_read_lock l;
_backup_service->get_state()->lock_read(l);
std::shared_ptr<app_state> app = _backup_service->get_state()->get_app(pid.get_app_id());
if (app == nullptr || app->status != app_status::AS_AVAILABLE) {
LOG_ERROR("app {} is not available, couldn't do backup now.", pid.get_app_id());
zauto_lock lock(_lock);
_is_backup_failed = true;
return;
}
partition_primary = app->partitions[pid.get_partition_index()].primary;
}
if (partition_primary.is_invalid()) {
LOG_WARNING(
"backup_id({}): partition {} doesn't have a primary now, retry to backup it later.",
_cur_backup.backup_id,
pid);
tasking::enqueue(LPC_DEFAULT_CALLBACK,
&_tracker,
[this, pid]() { backup_app_partition(pid); },
0,
std::chrono::seconds(10));
return;
}
auto req = std::make_unique<backup_request>();
req->pid = pid;
policy_info backup_policy_info;
backup_policy_info.__set_backup_provider_type(_provider_type);
backup_policy_info.__set_policy_name(get_policy_name());
req->policy = backup_policy_info;
req->backup_id = _cur_backup.backup_id;
req->app_name = _cur_backup.app_name;
if (!_backup_path.empty()) {
req->__set_backup_path(_backup_path);
}
LOG_INFO("backup_id({}): send backup request to partition {}, target_addr = {}",
_cur_backup.backup_id,
pid,
partition_primary);
backup_rpc rpc(std::move(req), RPC_COLD_BACKUP, 10000_ms, 0, pid.thread_hash());
rpc.call(
partition_primary, &_tracker, [this, rpc, pid, partition_primary](error_code err) mutable {
on_backup_reply(err, rpc.response(), pid, partition_primary);
});
zauto_lock l(_lock);
_backup_status[pid.get_partition_index()] = backup_status::ALIVE;
}
inline void backup_engine::handle_replica_backup_failed(const backup_response &response,
const gpid pid)
{
CHECK_EQ(response.pid, pid);
CHECK_EQ(response.backup_id, _cur_backup.backup_id);
LOG_ERROR("backup_id({}): backup for partition {} failed, response.err: {}",
_cur_backup.backup_id,
pid,
response.err);
zauto_lock l(_lock);
// if one partition fail, the whole backup plan fail.
_is_backup_failed = true;
_backup_status[pid.get_partition_index()] = backup_status::FAILED;
}
inline void backup_engine::retry_backup(const dsn::gpid pid)
{
tasking::enqueue(LPC_DEFAULT_CALLBACK,
&_tracker,
[this, pid]() { backup_app_partition(pid); },
0,
std::chrono::seconds(1));
}
void backup_engine::on_backup_reply(const error_code err,
const backup_response &response,
const gpid pid,
const rpc_address &primary)
{
{
zauto_lock l(_lock);
// if backup of some partition failed, we would not handle response from other partitions.
if (_is_backup_failed) {
return;
}
}
// if backup completed, receive ERR_OK and
// resp.progress=cold_backup_constant::PROGRESS_FINISHED;
// if backup failed, receive ERR_LOCAL_APP_FAILURE;
// backup not completed in other cases.
// see replica::on_cold_backup() for details.
auto rep_error = err == ERR_OK ? response.err : err;
if (rep_error == ERR_LOCAL_APP_FAILURE) {
handle_replica_backup_failed(response, pid);
return;
}
if (rep_error != ERR_OK) {
LOG_ERROR("backup_id({}): backup request to server {} failed, error: {}, retry to "
"send backup request.",
_cur_backup.backup_id,
primary,
rep_error);
retry_backup(pid);
return;
};
if (response.progress == cold_backup_constant::PROGRESS_FINISHED) {
CHECK_EQ(response.pid, pid);
CHECK_EQ(response.backup_id, _cur_backup.backup_id);
LOG_INFO("backup_id({}): backup for partition {} completed.", _cur_backup.backup_id, pid);
{
zauto_lock l(_lock);
_backup_status[pid.get_partition_index()] = backup_status::COMPLETED;
}
complete_current_backup();
return;
}
// backup is not finished, meta polling to send request
LOG_INFO("backup_id({}): receive backup response for partition {} from server {}, now "
"progress {}, retry to send backup request.",
_cur_backup.backup_id,
pid,
primary,
response.progress);
retry_backup(pid);
}
void backup_engine::write_backup_info()
{
std::string backup_root =
dsn::utils::filesystem::path_combine(_backup_path, _backup_service->backup_root());
std::string file_name = cold_backup::get_backup_info_file(backup_root, _cur_backup.backup_id);
blob buf = dsn::json::json_forwarder<app_backup_info>::encode(_cur_backup);
error_code err = write_backup_file(file_name, buf);
if (err == ERR_FS_INTERNAL) {
LOG_ERROR(
"backup_id({}): write backup info failed, error {}, do not try again for this error.",
_cur_backup.backup_id,
err);
zauto_lock l(_lock);
_is_backup_failed = true;
return;
}
if (err != ERR_OK) {
LOG_WARNING("backup_id({}): write backup info failed, retry it later.",
_cur_backup.backup_id);
tasking::enqueue(LPC_DEFAULT_CALLBACK,
&_tracker,
[this]() { write_backup_info(); },
0,
std::chrono::seconds(1));
return;
}
LOG_INFO("backup_id({}): successfully wrote backup info, backup for app {} completed.",
_cur_backup.backup_id,
_cur_backup.app_id);
zauto_lock l(_lock);
_cur_backup.end_time_ms = dsn_now_ms();
}
void backup_engine::complete_current_backup()
{
{
zauto_lock l(_lock);
for (const auto &status : _backup_status) {
if (status.second != backup_status::COMPLETED) {
// backup for some partition was not finished.
return;
}
}
}
// complete backup for all partitions.
write_backup_info();
}
error_code backup_engine::start()
{
error_code err = backup_app_meta();
if (err != ERR_OK) {
LOG_ERROR("backup_id({}): backup meta data for app {} failed, error {}",
_cur_backup.backup_id,
_cur_backup.app_id,
err);
return err;
}
for (int i = 0; i < _backup_status.size(); ++i) {
tasking::enqueue(LPC_DEFAULT_CALLBACK, &_tracker, [this, i]() {
backup_app_partition(gpid(_cur_backup.app_id, i));
});
}
return ERR_OK;
}
bool backup_engine::is_in_progress() const
{
zauto_lock l(_lock);
return _cur_backup.end_time_ms == 0 && !_is_backup_failed;
}
backup_item backup_engine::get_backup_item() const
{
zauto_lock l(_lock);
backup_item item;
item.backup_id = _cur_backup.backup_id;
item.app_name = _cur_backup.app_name;
item.backup_path = _backup_path;
item.backup_provider_type = _provider_type;
item.start_time_ms = _cur_backup.start_time_ms;
item.end_time_ms = _cur_backup.end_time_ms;
item.is_backup_failed = _is_backup_failed;
return item;
}
} // namespace replication
} // namespace dsn