feat(backup): 1. remove pervious implementation (#1956)
This pr removes some previous backup implementation, because the enhancement code
is strongly different with the current one. Besides, this pr also disable related unit tests
and function tests.
diff --git a/src/meta/meta_backup_service.cpp b/src/meta/meta_backup_service.cpp
index ff25050..7f77088 100644
--- a/src/meta/meta_backup_service.cpp
+++ b/src/meta/meta_backup_service.cpp
@@ -16,42 +16,12 @@
// under the License.
#include <absl/strings/string_view.h>
-#include <boost/cstdint.hpp>
-#include <boost/lexical_cast.hpp>
-#include <fmt/core.h>
-#include <algorithm>
-#include <iterator>
-#include <type_traits>
-#include <utility>
-#include "block_service/block_service.h"
-#include "block_service/block_service_manager.h"
-#include "common/backup_common.h"
-#include "common/replication.codes.h"
-#include "common/replication_enums.h"
-#include "dsn.layer2_types.h"
-#include "meta/backup_engine.h"
-#include "meta/meta_data.h"
#include "meta/meta_rpc_types.h"
-#include "meta/meta_state_service.h"
#include "meta_backup_service.h"
#include "meta_service.h"
-#include "runtime/api_layer1.h"
-#include "runtime/rpc/rpc_host_port.h"
-#include "runtime/rpc/rpc_holder.h"
-#include "runtime/rpc/rpc_message.h"
-#include "runtime/rpc/serialization.h"
-#include "security/access_controller.h"
-#include "runtime/task/async_calls.h"
-#include "runtime/task/task_code.h"
-#include "server_state.h"
-#include "utils/autoref_ptr.h"
-#include "utils/blob.h"
-#include "utils/chrono_literals.h"
-#include "utils/defer.h"
#include "utils/flags.h"
-#include "utils/fmt_logging.h"
-#include "utils/time_utils.h"
+#include "utils/metrics.h"
DSN_DECLARE_int32(cold_backup_checkpoint_reserve_minutes);
DSN_DECLARE_int32(fd_lease_seconds);
@@ -65,1747 +35,22 @@
namespace dsn {
namespace replication {
-namespace {
-
-metric_entity_ptr instantiate_backup_policy_metric_entity(const std::string &policy_name)
-{
- auto entity_id = fmt::format("backup_policy@{}", policy_name);
-
- return METRIC_ENTITY_backup_policy.instantiate(entity_id, {{"policy_name", policy_name}});
-}
-
-bool validate_backup_interval(int64_t backup_interval_seconds, std::string &hint_message)
-{
- // The backup interval must be larger than checkpoint reserve time.
- // Or the next cold backup checkpoint may be cleared by the clear operation.
- if (backup_interval_seconds <= FLAGS_cold_backup_checkpoint_reserve_minutes * 60) {
- hint_message = fmt::format(
- "backup interval must be larger than cold_backup_checkpoint_reserve_minutes={}",
- FLAGS_cold_backup_checkpoint_reserve_minutes);
- return false;
- }
-
- // There is a bug occurred in backup if the backup interval is less than 1 day, this is a
- // temporary resolution, the long term plan is to remove periodic backup.
- // See details https://github.com/apache/incubator-pegasus/issues/1081.
- if (backup_interval_seconds < 86400) {
- hint_message = fmt::format("backup interval must be >= 86400 (1 day)");
- return false;
- }
-
- return true;
-}
-
-} // anonymous namespace
-
-backup_policy_metrics::backup_policy_metrics(const std::string &policy_name)
- : _policy_name(policy_name),
- _backup_policy_metric_entity(instantiate_backup_policy_metric_entity(policy_name)),
- METRIC_VAR_INIT_backup_policy(backup_recent_duration_ms)
-{
-}
-
-const metric_entity_ptr &backup_policy_metrics::backup_policy_metric_entity() const
-{
- CHECK_NOTNULL(_backup_policy_metric_entity,
- "backup_policy metric entity (policy_name={}) should has been instantiated: "
- "uninitialized entity cannot be used to instantiate metric",
- _policy_name);
- return _backup_policy_metric_entity;
-}
-
-// TODO: backup_service and policy_context should need two locks, its own _lock and server_state's
-// _lock this maybe lead to deadlock, should refactor this
-
-void policy_context::start_backup_app_meta_unlocked(int32_t app_id)
-{
- server_state *state = _backup_service->get_state();
- dsn::blob buffer;
- bool app_available = false;
- {
- zauto_read_lock l;
- state->lock_read(l);
- const std::shared_ptr<app_state> &app = state->get_app(app_id);
- if (app != nullptr && app->status == app_status::AS_AVAILABLE) {
- app_available = true;
- // do not persistent envs to backup file
- if (app->envs.empty()) {
- buffer = dsn::json::json_forwarder<app_info>::encode(*app);
- } else {
- app_state tmp = *app;
- tmp.envs.clear();
- buffer = dsn::json::json_forwarder<app_info>::encode(tmp);
- }
- }
- }
-
- // if app is dropped when app is under backuping, we just skip backup this app this time, and
- // also we will not write backup-finish-flag on fds
- if (!app_available) {
- LOG_WARNING(
- "{}: can't encode app_info for app({}), perhaps removed, treat it as backup finished",
- _backup_sig,
- app_id);
- auto iter = _progress.unfinished_partitions_per_app.find(app_id);
- CHECK(iter != _progress.unfinished_partitions_per_app.end(),
- "{}: can't find app({}) in unfished_map",
- _backup_sig,
- app_id);
- _progress.is_app_skipped[app_id] = true;
- int total_partitions = iter->second;
- for (int32_t pidx = 0; pidx < total_partitions; ++pidx) {
- update_partition_progress_unlocked(
- gpid(app_id, pidx), cold_backup_constant::PROGRESS_FINISHED, dsn::host_port());
- }
- return;
- }
-
- dist::block_service::create_file_request create_file_req;
- create_file_req.ignore_metadata = true;
- create_file_req.file_name = cold_backup::get_app_metadata_file(_backup_service->backup_root(),
- _policy.app_names.at(app_id),
- app_id,
- _cur_backup.backup_id);
-
- dsn::error_code err;
- dist::block_service::block_file_ptr remote_file;
- // here we can use synchronous way coz create_file with ignored metadata is very fast
- _block_service
- ->create_file(create_file_req,
- TASK_CODE_EXEC_INLINED,
- [&err, &remote_file](const dist::block_service::create_file_response &resp) {
- err = resp.err;
- remote_file = resp.file_handle;
- })
- ->wait();
- if (err != dsn::ERR_OK) {
- LOG_ERROR("{}: create file {} failed, restart this backup later",
- _backup_sig,
- create_file_req.file_name);
- tasking::enqueue(LPC_DEFAULT_CALLBACK,
- &_tracker,
- [this, app_id]() {
- zauto_lock l(_lock);
- start_backup_app_meta_unlocked(app_id);
- },
- 0,
- _backup_service->backup_option().block_retry_delay_ms);
- return;
- }
- CHECK_NOTNULL(remote_file,
- "{}: create file({}) succeed, but can't get handle",
- _backup_sig,
- create_file_req.file_name);
-
- remote_file->write(
- dist::block_service::write_request{buffer},
- LPC_DEFAULT_CALLBACK,
- [this, remote_file, buffer, app_id](const dist::block_service::write_response &resp) {
- if (resp.err == dsn::ERR_OK) {
- CHECK_EQ(resp.written_size, buffer.length());
- {
- zauto_lock l(_lock);
- LOG_INFO("{}: successfully backup app metadata to {}",
- _policy.policy_name,
- remote_file->file_name());
- start_backup_app_partitions_unlocked(app_id);
- }
- } else if (resp.err == ERR_FS_INTERNAL) {
- zauto_lock l(_lock);
- _is_backup_failed = true;
- LOG_ERROR("write {} failed, err = {}, don't try again when got this error.",
- remote_file->file_name(),
- resp.err);
- return;
- } else {
- LOG_WARNING("write {} failed, reason({}), try it later",
- remote_file->file_name(),
- resp.err);
- tasking::enqueue(LPC_DEFAULT_CALLBACK,
- &_tracker,
- [this, app_id]() {
- zauto_lock l(_lock);
- start_backup_app_meta_unlocked(app_id);
- },
- 0,
- _backup_service->backup_option().block_retry_delay_ms);
- }
- },
- &_tracker);
-}
-
-void policy_context::start_backup_app_partitions_unlocked(int32_t app_id)
-{
- auto iter = _progress.unfinished_partitions_per_app.find(app_id);
- CHECK(iter != _progress.unfinished_partitions_per_app.end(),
- "{}: can't find app({}) in unfinished apps",
- _backup_sig,
- app_id);
- for (int32_t i = 0; i < iter->second; ++i) {
- start_backup_partition_unlocked(gpid(app_id, i));
- }
-}
-
-void policy_context::write_backup_app_finish_flag_unlocked(int32_t app_id,
- dsn::task_ptr write_callback)
-{
- if (_progress.is_app_skipped[app_id]) {
- LOG_WARNING("app is unavaliable, skip write finish flag for this app(app_id = {})", app_id);
- if (write_callback != nullptr) {
- write_callback->enqueue();
- }
- return;
- }
-
- backup_flag flag;
- flag.total_checkpoint_size = 0;
-
- for (const auto &pair : _progress.app_chkpt_size[app_id]) {
- flag.total_checkpoint_size += pair.second;
- }
-
- dsn::error_code err;
- dist::block_service::block_file_ptr remote_file;
-
- dist::block_service::create_file_request create_file_req;
- create_file_req.ignore_metadata = true;
- create_file_req.file_name =
- cold_backup::get_app_backup_status_file(_backup_service->backup_root(),
- _policy.app_names.at(app_id),
- app_id,
- _cur_backup.backup_id);
- // here we can use synchronous way coz create_file with ignored metadata is very fast
- _block_service
- ->create_file(create_file_req,
- TASK_CODE_EXEC_INLINED,
- [&err, &remote_file](const dist::block_service::create_file_response &resp) {
- err = resp.err;
- remote_file = resp.file_handle;
- })
- ->wait();
-
- if (err != ERR_OK) {
- LOG_ERROR("{}: create file {} failed, restart this backup later",
- _backup_sig,
- create_file_req.file_name);
- tasking::enqueue(LPC_DEFAULT_CALLBACK,
- &_tracker,
- [this, app_id, write_callback]() {
- zauto_lock l(_lock);
- write_backup_app_finish_flag_unlocked(app_id, write_callback);
- },
- 0,
- _backup_service->backup_option().block_retry_delay_ms);
- return;
- }
-
- CHECK_NOTNULL(remote_file,
- "{}: create file({}) succeed, but can't get handle",
- _backup_sig,
- create_file_req.file_name);
- if (remote_file->get_size() > 0) {
- // we only focus whether app_backup_status file is exist, so ignore app_backup_status file's
- // context
- LOG_INFO("app({}) already write finish-flag on block service", app_id);
- if (write_callback != nullptr) {
- write_callback->enqueue();
- }
- return;
- }
-
- blob buf = ::dsn::json::json_forwarder<backup_flag>::encode(flag);
-
- remote_file->write(
- dist::block_service::write_request{buf},
- LPC_DEFAULT_CALLBACK,
- [this, app_id, write_callback, remote_file](
- const dist::block_service::write_response &resp) {
- if (resp.err == ERR_OK) {
- LOG_INFO("app({}) finish backup and write finish-flag on block service succeed",
- app_id);
- if (write_callback != nullptr) {
- write_callback->enqueue();
- }
- } else if (resp.err == ERR_FS_INTERNAL) {
- zauto_lock l(_lock);
- _is_backup_failed = true;
- LOG_ERROR("write {} failed, err = {}, don't try again when got this error.",
- remote_file->file_name(),
- resp.err);
- return;
- } else {
- LOG_WARNING("write {} failed, reason({}), try it later",
- remote_file->file_name(),
- resp.err);
- tasking::enqueue(LPC_DEFAULT_CALLBACK,
- &_tracker,
- [this, app_id, write_callback]() {
- zauto_lock l(_lock);
- write_backup_app_finish_flag_unlocked(app_id, write_callback);
- },
- 0,
- _backup_service->backup_option().block_retry_delay_ms);
- }
- });
-}
-
-void policy_context::finish_backup_app_unlocked(int32_t app_id)
-{
- LOG_INFO("{}: finish backup for app({}), progress({})",
- _backup_sig,
- app_id,
- _progress.unfinished_apps);
- if (--_progress.unfinished_apps == 0) {
- LOG_INFO("{}: finish current backup for all apps", _backup_sig);
- _cur_backup.end_time_ms = dsn_now_ms();
-
- task_ptr write_backup_info_callback =
- tasking::create_task(LPC_DEFAULT_CALLBACK, &_tracker, [this]() {
- task_ptr start_a_new_backup =
- tasking::create_task(LPC_DEFAULT_CALLBACK, &_tracker, [this]() {
- zauto_lock l(_lock);
- auto iter = _backup_history.emplace(_cur_backup.backup_id, _cur_backup);
- CHECK(iter.second,
- "{}: backup_id({}) already in the backup_history",
- _policy.policy_name,
- _cur_backup.backup_id);
- _cur_backup.start_time_ms = 0;
- _cur_backup.end_time_ms = 0;
- LOG_INFO("{}: finish an old backup, try to start a new one", _backup_sig);
- issue_new_backup_unlocked();
- });
- sync_backup_to_remote_storage_unlocked(_cur_backup, start_a_new_backup, false);
- });
- write_backup_info_unlocked(_cur_backup, write_backup_info_callback);
- }
-}
-
-void policy_context::write_backup_info_unlocked(const backup_info &b_info,
- dsn::task_ptr write_callback)
-{
- dsn::error_code err;
- dist::block_service::block_file_ptr remote_file;
-
- dist::block_service::create_file_request create_file_req;
- create_file_req.ignore_metadata = true;
- create_file_req.file_name =
- cold_backup::get_backup_info_file(_backup_service->backup_root(), b_info.backup_id);
- // here we can use synchronous way coz create_file with ignored metadata is very fast
- _block_service
- ->create_file(create_file_req,
- TASK_CODE_EXEC_INLINED,
- [&err, &remote_file](const dist::block_service::create_file_response &resp) {
- err = resp.err;
- remote_file = resp.file_handle;
- })
- ->wait();
-
- if (err != ERR_OK) {
- LOG_ERROR("{}: create file {} failed, restart this backup later",
- _backup_sig,
- create_file_req.file_name);
- tasking::enqueue(LPC_DEFAULT_CALLBACK,
- &_tracker,
- [this, b_info, write_callback]() {
- zauto_lock l(_lock);
- write_backup_info_unlocked(b_info, write_callback);
- },
- 0,
- _backup_service->backup_option().block_retry_delay_ms);
- return;
- }
-
- CHECK_NOTNULL(remote_file,
- "{}: create file({}) succeed, but can't get handle",
- _backup_sig,
- create_file_req.file_name);
-
- blob buf = dsn::json::json_forwarder<backup_info>::encode(b_info);
-
- remote_file->write(
- dist::block_service::write_request{buf},
- LPC_DEFAULT_CALLBACK,
- [this, b_info, write_callback, remote_file](
- const dist::block_service::write_response &resp) {
- if (resp.err == ERR_OK) {
- LOG_INFO("policy({}) write backup_info to cold backup media succeed",
- _policy.policy_name);
- if (write_callback != nullptr) {
- write_callback->enqueue();
- }
- } else if (resp.err == ERR_FS_INTERNAL) {
- zauto_lock l(_lock);
- _is_backup_failed = true;
- LOG_ERROR("write {} failed, err = {}, don't try again when got this error.",
- remote_file->file_name(),
- resp.err);
- return;
- } else {
- LOG_WARNING("write {} failed, reason({}), try it later",
- remote_file->file_name(),
- resp.err);
- tasking::enqueue(LPC_DEFAULT_CALLBACK,
- &_tracker,
- [this, b_info, write_callback]() {
- zauto_lock l(_lock);
- write_backup_info_unlocked(b_info, write_callback);
- },
- 0,
- _backup_service->backup_option().block_retry_delay_ms);
- }
- });
-}
-
-bool policy_context::update_partition_progress_unlocked(gpid pid,
- int32_t progress,
- const host_port &source)
-{
- int32_t &local_progress = _progress.partition_progress[pid];
- if (local_progress == cold_backup_constant::PROGRESS_FINISHED) {
- LOG_WARNING(
- "{}: backup of partition {} has been finished, ignore the backup response from {} ",
- _backup_sig,
- pid,
- source);
- return true;
- }
-
- if (progress < local_progress) {
- LOG_WARNING("{}: local backup progress {} is larger than progress {} from server {} for "
- "partition {}, perhaps it's primary has changed",
- _backup_sig,
- local_progress,
- progress,
- source,
- pid);
- }
-
- local_progress = progress;
- LOG_DEBUG("{}: update partition {} backup progress to {}.", _backup_sig, pid, progress);
- if (local_progress == cold_backup_constant::PROGRESS_FINISHED) {
- LOG_INFO("{}: finish backup for partition {}, the app has {} unfinished backup "
- "partition now.",
- _backup_sig,
- pid,
- _progress.unfinished_partitions_per_app[pid.get_app_id()]);
-
- // update the progress-chain: partition => app => current_backup_instance
- if (--_progress.unfinished_partitions_per_app[pid.get_app_id()] == 0) {
- dsn::task_ptr task_after_write_finish_flag =
- tasking::create_task(LPC_DEFAULT_CALLBACK, &_tracker, [this, pid]() {
- zauto_lock l(_lock);
- finish_backup_app_unlocked(pid.get_app_id());
- });
- write_backup_app_finish_flag_unlocked(pid.get_app_id(), task_after_write_finish_flag);
- }
- }
- return local_progress == cold_backup_constant::PROGRESS_FINISHED;
-}
-
-void policy_context::record_partition_checkpoint_size_unlock(const gpid &pid, int64_t size)
-{
- _progress.app_chkpt_size[pid.get_app_id()][pid.get_partition_index()] = size;
-}
-
-void policy_context::start_backup_partition_unlocked(gpid pid)
-{
- dsn::host_port partition_primary;
- {
- // check app and partition status
- zauto_read_lock l;
- _backup_service->get_state()->lock_read(l);
- const app_state *app = _backup_service->get_state()->get_app(pid.get_app_id()).get();
-
- if (app == nullptr || app->status == app_status::AS_DROPPED) {
- LOG_WARNING(
- "{}: app {} is not available, skip to backup it.", _backup_sig, pid.get_app_id());
- _progress.is_app_skipped[pid.get_app_id()] = true;
- update_partition_progress_unlocked(
- pid, cold_backup_constant::PROGRESS_FINISHED, dsn::host_port());
- return;
- }
- partition_primary = app->partitions[pid.get_partition_index()].hp_primary;
- }
- if (partition_primary.is_invalid()) {
- LOG_WARNING("{}: partition {} doesn't have a primary now, retry to backup it later",
- _backup_sig,
- pid);
- tasking::enqueue(LPC_DEFAULT_CALLBACK,
- &_tracker,
- [this, pid]() {
- zauto_lock l(_lock);
- start_backup_partition_unlocked(pid);
- },
- 0,
- _backup_service->backup_option().reconfiguration_retry_delay_ms);
- return;
- }
-
- backup_request req;
- req.pid = pid;
- req.policy = *(static_cast<const policy_info *>(&_policy));
- req.backup_id = _cur_backup.backup_id;
- req.app_name = _policy.app_names.at(pid.get_app_id());
- dsn::message_ex *request =
- dsn::message_ex::create_request(RPC_COLD_BACKUP, 0, pid.thread_hash());
- dsn::marshall(request, req);
- dsn::rpc_response_task_ptr rpc_callback = rpc::create_rpc_response_task(
- request,
- &_tracker,
- [this, pid, partition_primary](error_code err, backup_response &&response) {
- on_backup_reply(err, std::move(response), pid, partition_primary);
- });
- LOG_INFO("{}: send backup command to partition {}, target_addr = {}",
- _backup_sig,
- pid,
- partition_primary);
- _backup_service->get_meta_service()->send_request(request, partition_primary, rpc_callback);
-}
-
-void policy_context::on_backup_reply(error_code err,
- backup_response &&response,
- gpid pid,
- const host_port &primary)
-{
- LOG_INFO(
- "{}: receive backup response for partition {} from server {}.", _backup_sig, pid, primary);
- if (err == dsn::ERR_OK && response.err == dsn::ERR_OK) {
- CHECK_EQ_MSG(response.policy_name,
- _policy.policy_name,
- "policy names don't match, pid({}), replica_server({})",
- pid,
- primary);
- CHECK_EQ_MSG(response.pid,
- pid,
- "{}: backup pids don't match, replica_server({})",
- _policy.policy_name,
- primary);
- CHECK_LE_MSG(response.backup_id,
- _cur_backup.backup_id,
- "{}: replica server({}) has bigger backup_id({}), gpid({})",
- _backup_sig,
- primary,
- response.backup_id,
- pid);
-
- if (response.backup_id < _cur_backup.backup_id) {
- LOG_WARNING("{}: got a backup response of partition {} from server {}, whose backup id "
- "{} is smaller than current backup id {}, maybe it is a stale message",
- _backup_sig,
- pid,
- primary,
- response.backup_id,
- _cur_backup.backup_id);
- } else {
- zauto_lock l(_lock);
- record_partition_checkpoint_size_unlock(pid, response.checkpoint_total_size);
- if (update_partition_progress_unlocked(pid, response.progress, primary)) {
- // partition backup finished
- return;
- }
- }
- } else if (response.err == dsn::ERR_LOCAL_APP_FAILURE) {
- zauto_lock l(_lock);
- _is_backup_failed = true;
- LOG_ERROR("{}: backup got error {} for partition {} from {}, don't try again when got "
- "this error.",
- _backup_sig.c_str(),
- response.err,
- pid,
- primary);
- return;
- } else {
- LOG_WARNING(
- "{}: backup got error for partition {} from {}, rpc error {}, response error {}",
- _backup_sig.c_str(),
- pid,
- primary,
- err,
- response.err);
- }
-
- // retry to backup the partition.
- tasking::enqueue(LPC_DEFAULT_CALLBACK,
- &_tracker,
- [this, pid]() {
- zauto_lock l(_lock);
- start_backup_partition_unlocked(pid);
- },
- 0,
- _backup_service->backup_option().request_backup_period_ms);
-}
-
-void policy_context::initialize_backup_progress_unlocked()
-{
- _progress.reset();
-
- zauto_read_lock l;
- _backup_service->get_state()->lock_read(l);
-
- // NOTICE: the unfinished_apps is initialized with the app-set's size
- // even if some apps are not available.
- _progress.unfinished_apps = _cur_backup.app_ids.size();
- for (const int32_t &app_id : _cur_backup.app_ids) {
- const std::shared_ptr<app_state> &app = _backup_service->get_state()->get_app(app_id);
- _progress.is_app_skipped[app_id] = true;
- if (app == nullptr) {
- LOG_WARNING("{}: app id({}) is invalid", _policy.policy_name, app_id);
- } else if (app->status != app_status::AS_AVAILABLE) {
- LOG_WARNING("{}: {} is not available, status({})",
- _policy.policy_name,
- app->get_logname(),
- enum_to_string(app->status));
- } else {
- // NOTICE: only available apps have entry in
- // unfinished_partitions_per_app & partition_progress & app_chkpt_size
- _progress.unfinished_partitions_per_app[app_id] = app->partition_count;
- std::map<int, int64_t> partition_chkpt_size;
- for (const partition_configuration &pc : app->partitions) {
- _progress.partition_progress[pc.pid] = 0;
- partition_chkpt_size[pc.pid.get_app_id()] = 0;
- }
- _progress.app_chkpt_size[app_id] = std::move(partition_chkpt_size);
- _progress.is_app_skipped[app_id] = false;
- }
- }
-}
-
-void policy_context::prepare_current_backup_on_new_unlocked()
-{
- // initialize the current backup structure
- _cur_backup.backup_id = _cur_backup.start_time_ms = static_cast<int64_t>(dsn_now_ms());
- _cur_backup.app_ids = _policy.app_ids;
- _cur_backup.app_names = _policy.app_names;
- _is_backup_failed = false;
-
- initialize_backup_progress_unlocked();
- _backup_sig =
- _policy.policy_name + "@" + boost::lexical_cast<std::string>(_cur_backup.backup_id);
-}
-
-void policy_context::sync_backup_to_remote_storage_unlocked(const backup_info &b_info,
- task_ptr sync_callback,
- bool create_new_node)
-{
- dsn::blob backup_data = dsn::json::json_forwarder<backup_info>::encode(b_info);
- std::string backup_info_path =
- _backup_service->get_backup_path(_policy.policy_name, b_info.backup_id);
-
- auto callback = [this, b_info, sync_callback, create_new_node](dsn::error_code err) {
- if (dsn::ERR_OK == err || (create_new_node && ERR_NODE_ALREADY_EXIST == err)) {
- LOG_INFO("{}: synced backup_info({}) to remote storage successfully, "
- "start real backup work, new_node_create({})",
- _policy.policy_name,
- b_info.backup_id,
- create_new_node ? "true" : "false");
- if (sync_callback != nullptr) {
- sync_callback->enqueue();
- } else {
- LOG_WARNING("{}: empty callback", _policy.policy_name);
- }
- } else if (ERR_TIMEOUT == err) {
- LOG_ERROR("{}: sync backup info({}) to remote storage got timeout, retry it later",
- _policy.policy_name,
- b_info.backup_id);
- tasking::enqueue(LPC_DEFAULT_CALLBACK,
- &_tracker,
- [this, b_info, sync_callback, create_new_node]() {
- zauto_lock l(_lock);
- sync_backup_to_remote_storage_unlocked(
- std::move(b_info), std::move(sync_callback), create_new_node);
- },
- 0,
- _backup_service->backup_option().meta_retry_delay_ms);
- } else {
- CHECK(false, "{}: we can't handle this right now, error({})", _backup_sig, err);
- }
- };
-
- if (create_new_node) {
- _backup_service->get_meta_service()->get_remote_storage()->create_node(
- backup_info_path, LPC_DEFAULT_CALLBACK, callback, backup_data, nullptr);
- } else {
- _backup_service->get_meta_service()->get_remote_storage()->set_data(
- backup_info_path, backup_data, LPC_DEFAULT_CALLBACK, callback, nullptr);
- }
-}
-
-void policy_context::continue_current_backup_unlocked()
-{
- if (_policy.is_disable) {
- LOG_INFO("{}: policy is disabled, ignore this backup and try it later",
- _policy.policy_name);
- tasking::enqueue(LPC_DEFAULT_CALLBACK,
- &_tracker,
- [this]() {
- zauto_lock l(_lock);
- issue_new_backup_unlocked();
- },
- 0,
- _backup_service->backup_option().issue_backup_interval_ms);
- return;
- }
-
- for (const int32_t &app : _cur_backup.app_ids) {
- if (_progress.unfinished_partitions_per_app.find(app) !=
- _progress.unfinished_partitions_per_app.end()) {
- start_backup_app_meta_unlocked(app);
- } else {
- dsn::task_ptr task_after_write_finish_flag =
- tasking::create_task(LPC_DEFAULT_CALLBACK, &_tracker, [this, app]() {
- zauto_lock l(_lock);
- finish_backup_app_unlocked(app);
- });
- write_backup_app_finish_flag_unlocked(app, task_after_write_finish_flag);
- }
- }
-}
-
-bool policy_context::should_start_backup_unlocked()
-{
- uint64_t now = dsn_now_ms();
- uint64_t recent_backup_start_time_ms = 0;
- if (!_backup_history.empty()) {
- recent_backup_start_time_ms = _backup_history.rbegin()->second.start_time_ms;
- }
-
- // the true start time of recent backup have drifted away with the origin start time of
- // policy,
- // so we should take the drift into consideration; if user change the start time of the
- // policy,
- // we just think the change of start time as drift
- int32_t hour = 0, min = 0, sec = 0;
- if (recent_backup_start_time_ms == 0) {
- // the first time to backup, just consider the start time
- ::dsn::utils::time_ms_to_date_time(now, hour, min, sec);
- return _policy.start_time.should_start_backup(hour, min);
- } else {
- uint64_t next_backup_time_ms =
- recent_backup_start_time_ms + _policy.backup_interval_seconds * 1000;
- if (_policy.start_time.hour != 24) {
- // user have specify the time point to start backup, so we should take the the
- // time-drift into consideration
-
- // compute the time-drift
- ::dsn::utils::time_ms_to_date_time(recent_backup_start_time_ms, hour, min, sec);
- int64_t time_dirft_ms = _policy.start_time.compute_time_drift_ms(hour, min);
-
- if (time_dirft_ms >= 0) {
- // hour:min(the true start time) >= policy.start_time :
- // 1, user move up the start time of policy, such as 20:00 to 2:00, we just
- // think this case as time drift
- // 2, the true start time of backup is delayed, compared the origin start time
- // of policy, we should process this case
- // 3, the true start time of backup is the same with the origin start time of
- // policy
- next_backup_time_ms -= time_dirft_ms;
- } else {
- // hour:min(the true start time) < policy.start_time:
- // 1, user delay the start time of policy, such as 2:00 to 23:00
- //
- // these case has already been handled, we do nothing
- }
- }
- if (next_backup_time_ms <= now) {
- ::dsn::utils::time_ms_to_date_time(now, hour, min, sec);
- return _policy.start_time.should_start_backup(hour, min);
- } else {
- return false;
- }
- }
-}
-
-void policy_context::issue_new_backup_unlocked()
-{
- // before issue new backup, we check whether the policy is dropped
- if (_policy.is_disable) {
- LOG_INFO("{}: policy is disabled, just ignore backup, try it later", _policy.policy_name);
- tasking::enqueue(LPC_DEFAULT_CALLBACK,
- &_tracker,
- [this]() {
- zauto_lock l(_lock);
- issue_new_backup_unlocked();
- },
- 0,
- _backup_service->backup_option().issue_backup_interval_ms);
- return;
- }
-
- if (!should_start_backup_unlocked()) {
- tasking::enqueue(LPC_DEFAULT_CALLBACK,
- &_tracker,
- [this]() {
- zauto_lock l(_lock);
- issue_new_backup_unlocked();
- },
- 0,
- _backup_service->backup_option().issue_backup_interval_ms);
- LOG_INFO("{}: start issue new backup {}ms later",
- _policy.policy_name,
- _backup_service->backup_option().issue_backup_interval_ms.count());
- return;
- }
-
- prepare_current_backup_on_new_unlocked();
- // if all apps are dropped, we don't issue a new backup
- if (_progress.unfinished_partitions_per_app.empty()) {
- // TODO: just ignore this backup and wait next backup
- LOG_WARNING("{}: all apps have been dropped, ignore this backup and retry it later",
- _backup_sig);
- tasking::enqueue(LPC_DEFAULT_CALLBACK,
- &_tracker,
- [this]() {
- zauto_lock l(_lock);
- issue_new_backup_unlocked();
- },
- 0,
- _backup_service->backup_option().issue_backup_interval_ms);
- } else {
- task_ptr continue_to_backup =
- tasking::create_task(LPC_DEFAULT_CALLBACK, &_tracker, [this]() {
- zauto_lock l(_lock);
- continue_current_backup_unlocked();
- });
- sync_backup_to_remote_storage_unlocked(_cur_backup, continue_to_backup, true);
- }
-}
-
-void policy_context::start()
-{
- zauto_lock l(_lock);
-
- if (_cur_backup.start_time_ms == 0) {
- issue_new_backup_unlocked();
- } else {
- continue_current_backup_unlocked();
- }
-
- CHECK(!_policy.policy_name.empty(), "policy_name should has been initialized");
- _metrics = std::make_unique<backup_policy_metrics>(_policy.policy_name);
-
- issue_gc_backup_info_task_unlocked();
- LOG_INFO("{}: start gc backup info task succeed", _policy.policy_name);
-}
-
-void policy_context::add_backup_history(const backup_info &info)
-{
- zauto_lock l(_lock);
- if (info.end_time_ms <= 0) {
- LOG_INFO("{}: encounter an unfished backup_info({}), start_time({}), continue it later",
- _policy.policy_name,
- info.backup_id,
- info.start_time_ms);
-
- CHECK_EQ_MSG(_cur_backup.start_time_ms,
- 0,
- "{}: shouldn't have multiple unfinished backup instance in a policy, {} vs {}",
- _policy.policy_name,
- _cur_backup.backup_id,
- info.backup_id);
- CHECK(_backup_history.empty() || info.backup_id > _backup_history.rbegin()->first,
- "{}: backup_id({}) in history larger than current({})",
- _policy.policy_name,
- _backup_history.rbegin()->first,
- info.backup_id);
- _cur_backup = info;
- initialize_backup_progress_unlocked();
- _backup_sig =
- _policy.policy_name + "@" + boost::lexical_cast<std::string>(_cur_backup.backup_id);
- } else {
- LOG_INFO("{}: add backup history, id({}), start_time({}), endtime({})",
- _policy.policy_name,
- info.backup_id,
- info.start_time_ms,
- info.end_time_ms);
- CHECK(_cur_backup.end_time_ms == 0 || info.backup_id < _cur_backup.backup_id,
- "{}: backup_id({}) in history larger than current({})",
- _policy.policy_name,
- info.backup_id,
- _cur_backup.backup_id);
-
- auto result_pair = _backup_history.emplace(info.backup_id, info);
- CHECK(
- result_pair.second, "{}: conflict backup id({})", _policy.policy_name, info.backup_id);
- }
-}
-
-std::vector<backup_info> policy_context::get_backup_infos(int cnt)
-{
- zauto_lock l(_lock);
-
- std::vector<backup_info> ret;
-
- if (cnt > 0 && _cur_backup.start_time_ms > 0) {
- ret.emplace_back(_cur_backup);
- cnt--;
- }
-
- for (auto it = _backup_history.rbegin(); it != _backup_history.rend() && cnt > 0; it++) {
- cnt--;
- ret.emplace_back(it->second);
- }
- return ret;
-}
-
-bool policy_context::is_under_backuping()
-{
- zauto_lock l(_lock);
- if (!_is_backup_failed && _cur_backup.start_time_ms > 0 && _cur_backup.end_time_ms <= 0) {
- return true;
- }
- return false;
-}
-
-void policy_context::set_policy(const policy &p)
-{
- zauto_lock l(_lock);
-
- const std::string old_backup_provider_type = _policy.backup_provider_type;
- _policy = p;
- if (_policy.backup_provider_type != old_backup_provider_type) {
- _block_service = _backup_service->get_meta_service()
- ->get_block_service_manager()
- .get_or_create_block_filesystem(_policy.backup_provider_type);
- }
- CHECK(_block_service,
- "can't initialize block filesystem by provider ({})",
- _policy.backup_provider_type);
-}
-
-policy policy_context::get_policy()
-{
- zauto_lock l(_lock);
- return _policy;
-}
-
-void policy_context::gc_backup_info_unlocked(const backup_info &info_to_gc)
-{
- char start_time[30] = {'\0'};
- char end_time[30] = {'\0'};
- ::dsn::utils::time_ms_to_date_time(
- static_cast<uint64_t>(info_to_gc.start_time_ms), start_time, 30);
- ::dsn::utils::time_ms_to_date_time(static_cast<uint64_t>(info_to_gc.end_time_ms), end_time, 30);
- LOG_INFO("{}: start to gc backup info, backup_id({}), start_time({}), end_time({})",
- _policy.policy_name,
- info_to_gc.backup_id,
- start_time,
- end_time);
-
- dsn::task_ptr sync_callback =
- ::dsn::tasking::create_task(LPC_DEFAULT_CALLBACK, &_tracker, [this, info_to_gc]() {
- dist::block_service::remove_path_request req;
- req.path =
- cold_backup::get_backup_path(_backup_service->backup_root(), info_to_gc.backup_id);
- req.recursive = true;
- _block_service->remove_path(
- req,
- LPC_DEFAULT_CALLBACK,
- [this, info_to_gc](const dist::block_service::remove_path_response &resp) {
- // remove dir ok or dir is not exist
- if (resp.err == ERR_OK || resp.err == ERR_OBJECT_NOT_FOUND) {
- dsn::task_ptr remove_local_backup_info_task = tasking::create_task(
- LPC_DEFAULT_CALLBACK, &_tracker, [this, info_to_gc]() {
- zauto_lock l(_lock);
- _backup_history.erase(info_to_gc.backup_id);
- issue_gc_backup_info_task_unlocked();
- });
- sync_remove_backup_info(info_to_gc, remove_local_backup_info_task);
- } else { // ERR_FS_INTERNAL, ERR_TIMEOUT, ERR_DIR_NOT_EMPTY
- LOG_WARNING(
- "{}: gc backup info, id({}) failed, with err = {}, just try again",
- _policy.policy_name,
- info_to_gc.backup_id,
- resp.err);
- gc_backup_info_unlocked(info_to_gc);
- }
- });
- });
- sync_backup_to_remote_storage_unlocked(info_to_gc, sync_callback, false);
-}
-
-void policy_context::issue_gc_backup_info_task_unlocked()
-{
- if (_backup_history.size() > _policy.backup_history_count_to_keep) {
- backup_info &info = _backup_history.begin()->second;
- info.info_status = backup_info_status::type::DELETING;
- LOG_INFO("{}: start to gc backup info with id({})", _policy.policy_name, info.backup_id);
-
- tasking::create_task(LPC_DEFAULT_CALLBACK, &_tracker, [this, info]() {
- gc_backup_info_unlocked(info);
- })->enqueue();
- } else {
- // there is no extra backup to gc, we just issue a new task to call
- // issue_gc_backup_info_task_unlocked later
- LOG_DEBUG("{}: no need to gc backup info, start it later", _policy.policy_name);
- tasking::create_task(LPC_DEFAULT_CALLBACK, &_tracker, [this]() {
- zauto_lock l(_lock);
- issue_gc_backup_info_task_unlocked();
- })->enqueue(std::chrono::minutes(3));
- }
-
- // update recent backup duration time
- uint64_t last_backup_duration_time_ms = 0;
- if (_cur_backup.start_time_ms == 0) {
- if (!_backup_history.empty()) {
- const backup_info &b_info = _backup_history.rbegin()->second;
- last_backup_duration_time_ms = (b_info.end_time_ms - b_info.start_time_ms);
- }
- } else if (_cur_backup.start_time_ms > 0) {
- if (_cur_backup.end_time_ms == 0) {
- last_backup_duration_time_ms = (dsn_now_ms() - _cur_backup.start_time_ms);
- } else if (_cur_backup.end_time_ms > 0) {
- last_backup_duration_time_ms = (_cur_backup.end_time_ms - _cur_backup.start_time_ms);
- }
- }
- METRIC_SET(*_metrics, backup_recent_duration_ms, last_backup_duration_time_ms);
-}
-
-void policy_context::sync_remove_backup_info(const backup_info &info, dsn::task_ptr sync_callback)
-{
- std::string backup_info_path =
- _backup_service->get_backup_path(_policy.policy_name, info.backup_id);
- auto callback = [this, info, sync_callback](dsn::error_code err) {
- if (err == dsn::ERR_OK || err == dsn::ERR_OBJECT_NOT_FOUND) {
- LOG_INFO("{}: sync remove backup_info on remote storage successfully, backup_id({})",
- _policy.policy_name,
- info.backup_id);
- if (sync_callback != nullptr) {
- sync_callback->enqueue();
- }
- } else if (err == ERR_TIMEOUT) {
- LOG_ERROR("{}: sync remove backup info on remote storage got timeout, retry it later",
- _policy.policy_name);
- tasking::enqueue(
- LPC_DEFAULT_CALLBACK,
- &_tracker,
- [this, info, sync_callback]() { sync_remove_backup_info(info, sync_callback); },
- 0,
- _backup_service->backup_option().meta_retry_delay_ms);
- } else {
- CHECK(false, "{}: we can't handle this right now, error({})", _policy.policy_name, err);
- }
- };
-
- _backup_service->get_meta_service()->get_remote_storage()->delete_node(
- backup_info_path, true, LPC_DEFAULT_CALLBACK, callback, nullptr);
-}
-
backup_service::backup_service(meta_service *meta_svc,
const std::string &policy_meta_root,
- const std::string &backup_root,
- const policy_factory &factory)
- : _factory(factory),
- _meta_svc(meta_svc),
- _policy_meta_root(policy_meta_root),
- _backup_root(backup_root)
+ const std::string &backup_root)
+ : _meta_svc(meta_svc), _policy_meta_root(policy_meta_root), _backup_root(backup_root)
{
_state = _meta_svc->get_server_state();
-
- _opt.meta_retry_delay_ms = 10000_ms;
- _opt.block_retry_delay_ms = 60000_ms;
- _opt.app_dropped_retry_delay_ms = 600000_ms;
- _opt.reconfiguration_retry_delay_ms = 15000_ms;
- _opt.request_backup_period_ms = 10000_ms;
- _opt.issue_backup_interval_ms = 300000_ms;
-
- _in_initialize.store(true);
}
-void backup_service::start_create_policy_meta_root(dsn::task_ptr callback)
-{
- LOG_DEBUG("create policy meta root({}) on remote_storage", _policy_meta_root);
- _meta_svc->get_remote_storage()->create_node(
- _policy_meta_root, LPC_DEFAULT_CALLBACK, [this, callback](dsn::error_code err) {
- if (err == dsn::ERR_OK || err == ERR_NODE_ALREADY_EXIST) {
- LOG_INFO(
- "create policy meta root({}) succeed, with err({})", _policy_meta_root, err);
- callback->enqueue();
- } else if (err == dsn::ERR_TIMEOUT) {
- LOG_ERROR("create policy meta root({}) timeout, try it later", _policy_meta_root);
- dsn::tasking::enqueue(
- LPC_DEFAULT_CALLBACK,
- &_tracker,
- std::bind(&backup_service::start_create_policy_meta_root, this, callback),
- 0,
- _opt.meta_retry_delay_ms);
- } else {
- CHECK(false, "we can't handle this error({}) right now", err);
- }
- });
-}
+// TODO(heyuchen): implement it
+void backup_service::start() {}
-void backup_service::start_sync_policies()
-{
- // TODO: make sync_policies_from_remote_storage function to async
- // sync-api will leader to deadlock when the threadnum = 1 in default threadpool
- LOG_INFO("backup service start to sync policies from remote storage");
- dsn::error_code err = sync_policies_from_remote_storage();
- if (err == dsn::ERR_OK) {
- for (auto &policy_kv : _policy_states) {
- LOG_INFO("policy({}) start to backup", policy_kv.first);
- policy_kv.second->start();
- }
- if (_policy_states.empty()) {
- LOG_WARNING(
- "can't sync policies from remote storage, user should config some policies");
- }
- _in_initialize.store(false);
- } else if (err == dsn::ERR_TIMEOUT) {
- LOG_ERROR("sync policies got timeout, retry it later");
- dsn::tasking::enqueue(LPC_DEFAULT_CALLBACK,
- &_tracker,
- std::bind(&backup_service::start_sync_policies, this),
- 0,
- _opt.meta_retry_delay_ms);
- } else {
- CHECK(false,
- "sync policies from remote storage encounter error({}), we can't handle "
- "this right now");
- }
-}
+// TODO(heyuchen): implement it
+void backup_service::start_backup_app(start_backup_app_rpc rpc) {}
-error_code backup_service::sync_policies_from_remote_storage()
-{
- // policy on remote storage:
- // -- <root>/policy_name/backup_id_1
- // -- /backup_id_2
- error_code err = ERR_OK;
- dsn::task_tracker tracker;
-
- auto init_backup_info = [this, &err, &tracker](const std::string &policy_name) {
- auto after_get_backup_info = [this, &err, policy_name](error_code ec, const blob &value) {
- if (ec == ERR_OK) {
- LOG_DEBUG("sync a backup string({}) from remote storage", value.data());
- backup_info tbackup_info;
- dsn::json::json_forwarder<backup_info>::decode(value, tbackup_info);
-
- policy_context *ptr = nullptr;
- {
- zauto_lock l(_lock);
- auto it = _policy_states.find(policy_name);
- if (it == _policy_states.end()) {
- CHECK(false,
- "before initializing the backup_info, initialize the policy first");
- }
- ptr = it->second.get();
- }
- ptr->add_backup_history(tbackup_info);
- } else {
- err = ec;
- LOG_INFO("init backup_info from remote storage fail, error_code = {}", ec);
- }
- };
- std::string backup_info_root = get_policy_path(policy_name);
-
- _meta_svc->get_remote_storage()->get_children(
- backup_info_root,
- LPC_DEFAULT_CALLBACK, // TASK_CODE_EXEC_INLINED,
- [this, &err, &tracker, policy_name, after_get_backup_info](
- error_code ec, const std::vector<std::string> &children) {
- if (ec == ERR_OK) {
- if (children.size() > 0) {
- for (const auto &b_id : children) {
- int64_t backup_id = boost::lexical_cast<int64_t>(b_id);
- std::string backup_path = get_backup_path(policy_name, backup_id);
- LOG_INFO("start to acquire backup_info({}) of policy({})",
- backup_id,
- policy_name);
- _meta_svc->get_remote_storage()->get_data(
- backup_path,
- TASK_CODE_EXEC_INLINED,
- std::move(after_get_backup_info),
- &tracker);
- }
- } else // have not backup
- {
- LOG_INFO("policy has not started a backup process, policy_name = {}",
- policy_name);
- }
- } else {
- err = ec;
- LOG_ERROR("get backup info dirs fail from remote storage, backup_dirs_root = "
- "{}, err = {}",
- get_policy_path(policy_name),
- ec);
- }
- },
- &tracker);
- };
-
- auto init_one_policy =
- [this, &err, &tracker, &init_backup_info](const std::string &policy_name) {
- auto policy_path = get_policy_path(policy_name);
- LOG_INFO("start to acquire the context of policy({})", policy_name);
- _meta_svc->get_remote_storage()->get_data(
- policy_path,
- LPC_DEFAULT_CALLBACK, // TASK_CODE_EXEC_INLINED,
- [this, &err, &init_backup_info, policy_path, policy_name](error_code ec,
- const blob &value) {
- if (ec == ERR_OK) {
- std::shared_ptr<policy_context> policy_ctx = _factory(this);
- policy tpolicy;
- dsn::json::json_forwarder<policy>::decode(value, tpolicy);
- policy_ctx->set_policy(tpolicy);
-
- {
- zauto_lock l(_lock);
- _policy_states.insert(std::make_pair(policy_name, policy_ctx));
- }
- init_backup_info(policy_name);
- } else {
- err = ec;
- LOG_ERROR(
- "init policy fail, policy_path = {}, error_code = {}", policy_path, ec);
- }
- },
- &tracker);
- };
-
- _meta_svc->get_remote_storage()->get_children(
- _policy_meta_root,
- LPC_DEFAULT_CALLBACK, // TASK_CODE_EXEC_INLINED,,
- [&err, &init_one_policy](error_code ec, const std::vector<std::string> &children) {
- if (ec == ERR_OK) {
- // children's name is name of each policy
- for (const auto &policy_name : children) {
- init_one_policy(policy_name);
- }
- } else {
- err = ec;
- LOG_ERROR("get policy dirs from remote storage fail, error_code = {}", ec);
- }
- },
- &tracker);
- tracker.wait_outstanding_tasks();
- return err;
-}
-
-void backup_service::start()
-{
- dsn::task_ptr after_create_policy_meta_root =
- tasking::create_task(LPC_DEFAULT_CALLBACK, &_tracker, [this]() { start_sync_policies(); });
- start_create_policy_meta_root(after_create_policy_meta_root);
-}
-
-void backup_service::add_backup_policy(dsn::message_ex *msg)
-{
- configuration_add_backup_policy_request request;
- configuration_add_backup_policy_response response;
- auto log_on_failed = dsn::defer([&response]() {
- if (!response.hint_message.empty()) {
- LOG_WARNING(response.hint_message);
- }
- });
-
- dsn::message_ex *copied_msg = message_ex::copy_message_no_reply(*msg);
- ::dsn::unmarshall(msg, request);
- std::set<int32_t> app_ids;
- std::map<int32_t, std::string> app_names;
-
- std::string hint_message;
- if (!validate_backup_interval(request.backup_interval_seconds, hint_message)) {
- response.err = ERR_INVALID_PARAMETERS;
- response.hint_message = hint_message;
- _meta_svc->reply_data(msg, response);
- msg->release_ref();
- return;
- }
-
- {
- // check app status
- zauto_read_lock l;
- _state->lock_read(l);
- for (auto &app_id : request.app_ids) {
- const std::shared_ptr<app_state> &app = _state->get_app(app_id);
- if (app == nullptr) {
- LOG_ERROR("app {} doesn't exist, policy {} shouldn't be added.",
- app_id,
- request.policy_name);
- response.err = ERR_INVALID_PARAMETERS;
- response.hint_message = "invalid app " + std::to_string(app_id);
- _meta_svc->reply_data(msg, response);
- msg->release_ref();
- return;
- }
- // when the Ranger ACL is enabled, access control will be checked for each table.
- auto access_controller = _meta_svc->get_access_controller();
- // adding multiple judgments here is to adapt to the old ACL and avoid checking again.
- if (access_controller->is_enable_ranger_acl() &&
- !access_controller->allowed(copied_msg, app->app_name)) {
- response.err = ERR_ACL_DENY;
- response.hint_message =
- fmt::format("not authorized to add backup policy({}) for app id: {}",
- request.policy_name,
- app_id);
- _meta_svc->reply_data(msg, response);
- msg->release_ref();
- return;
- }
- app_ids.insert(app_id);
- app_names.insert(std::make_pair(app_id, app->app_name));
- }
- }
-
- {
- // check policy name
- zauto_lock l(_lock);
- if (!is_valid_policy_name_unlocked(request.policy_name)) {
- response.err = ERR_INVALID_PARAMETERS;
- response.hint_message = "invalid policy_name: " + request.policy_name;
- _meta_svc->reply_data(msg, response);
- msg->release_ref();
- return;
- }
- }
-
- // check backup provider
- if (_meta_svc->get_block_service_manager().get_or_create_block_filesystem(
- request.backup_provider_type) == nullptr) {
- response.err = ERR_INVALID_PARAMETERS;
- response.hint_message = "invalid backup_provider_type: " + request.backup_provider_type;
- _meta_svc->reply_data(msg, response);
- msg->release_ref();
- return;
- }
-
- LOG_INFO("start to add backup polciy {}.", request.policy_name);
- std::shared_ptr<policy_context> policy_context_ptr = _factory(this);
- CHECK_NOTNULL(policy_context_ptr, "invalid policy_context");
- policy p;
- p.policy_name = request.policy_name;
- p.backup_provider_type = request.backup_provider_type;
- p.backup_interval_seconds = request.backup_interval_seconds;
- p.backup_history_count_to_keep = request.backup_history_count_to_keep;
- p.start_time.parse_from(request.start_time);
- p.app_ids = app_ids;
- p.app_names = app_names;
- policy_context_ptr->set_policy(p);
- do_add_policy(msg, policy_context_ptr, response.hint_message);
-}
-
-void backup_service::do_add_policy(dsn::message_ex *req,
- std::shared_ptr<policy_context> p,
- const std::string &hint_msg)
-{
- policy cur_policy = p->get_policy();
-
- std::string policy_path = get_policy_path(cur_policy.policy_name);
- blob value = json::json_forwarder<policy>::encode(cur_policy);
- _meta_svc->get_remote_storage()->create_node(
- policy_path,
- LPC_DEFAULT_CALLBACK, // TASK_CODE_EXEC_INLINED,
- [ this, req, p, hint_msg, policy_name = cur_policy.policy_name ](error_code err) {
- if (err == ERR_OK || err == ERR_NODE_ALREADY_EXIST) {
- configuration_add_backup_policy_response resp;
- resp.hint_message = hint_msg;
- resp.err = ERR_OK;
- LOG_INFO("add backup policy succeed, policy_name = {}", policy_name);
-
- _meta_svc->reply_data(req, resp);
- req->release_ref();
- {
- zauto_lock l(_lock);
- _policy_states.insert(std::make_pair(policy_name, p));
- }
- p->start();
- } else if (err == ERR_TIMEOUT) {
- LOG_ERROR("create backup policy on remote storage timeout, retry after {} ms",
- _opt.meta_retry_delay_ms.count());
- tasking::enqueue(LPC_DEFAULT_CALLBACK,
- &_tracker,
- std::bind(&backup_service::do_add_policy, this, req, p, hint_msg),
- 0,
- _opt.meta_retry_delay_ms);
- return;
- } else {
- CHECK(false, "we can't handle this when create backup policy, err({})", err);
- }
- },
- value);
-}
-
-void backup_service::do_update_policy_to_remote_storage(
- configuration_modify_backup_policy_rpc rpc,
- const policy &p,
- std::shared_ptr<policy_context> &p_context_ptr)
-{
- std::string policy_path = get_policy_path(p.policy_name);
- blob value = json::json_forwarder<policy>::encode(p);
- _meta_svc->get_remote_storage()->set_data(
- policy_path, value, LPC_DEFAULT_CALLBACK, [this, rpc, p, p_context_ptr](error_code err) {
- if (err == ERR_OK) {
- configuration_modify_backup_policy_response resp;
- resp.err = ERR_OK;
- LOG_INFO("update backup policy to remote storage succeed, policy_name = {}",
- p.policy_name);
- p_context_ptr->set_policy(p);
- } else if (err == ERR_TIMEOUT) {
- LOG_ERROR("update backup policy to remote storage failed, policy_name = {}, "
- "retry after {} ms",
- p.policy_name,
- _opt.meta_retry_delay_ms.count());
- tasking::enqueue(LPC_DEFAULT_CALLBACK,
- &_tracker,
- std::bind(&backup_service::do_update_policy_to_remote_storage,
- this,
- rpc,
- p,
- p_context_ptr),
- 0,
- _opt.meta_retry_delay_ms);
- } else {
- CHECK(false, "we can't handle this when create backup policy, err({})", err);
- }
- });
-}
-
-bool backup_service::is_valid_policy_name_unlocked(const std::string &policy_name)
-{
- auto iter = _policy_states.find(policy_name);
- return (iter == _policy_states.end());
-}
-
-void backup_service::query_backup_policy(query_backup_policy_rpc rpc)
-{
- const configuration_query_backup_policy_request &request = rpc.request();
- configuration_query_backup_policy_response &response = rpc.response();
- auto log_on_failed = dsn::defer([&response]() {
- if (!response.hint_msg.empty()) {
- LOG_WARNING(response.hint_msg);
- }
- });
-
- response.err = ERR_OK;
-
- std::vector<std::string> policy_names = request.policy_names;
- if (policy_names.empty()) {
- // default all the policy
- zauto_lock l(_lock);
- for (const auto &pair : _policy_states) {
- policy_names.emplace_back(pair.first);
- }
- }
- for (const auto &policy_name : policy_names) {
- std::shared_ptr<policy_context> policy_context_ptr(nullptr);
- {
- zauto_lock l(_lock);
- auto it = _policy_states.find(policy_name);
- if (it != _policy_states.end()) {
- policy_context_ptr = it->second;
- }
- }
- if (policy_context_ptr == nullptr) {
- if (!response.hint_msg.empty()) {
- response.hint_msg += "\n\t";
- }
- response.hint_msg += std::string("invalid policy_name " + policy_name);
- continue;
- }
-
- policy cur_policy = policy_context_ptr->get_policy();
- policy_entry p_entry;
- p_entry.policy_name = cur_policy.policy_name;
- p_entry.backup_provider_type = cur_policy.backup_provider_type;
- p_entry.backup_interval_seconds = std::to_string(cur_policy.backup_interval_seconds);
- p_entry.app_ids = cur_policy.app_ids;
- p_entry.backup_history_count_to_keep = cur_policy.backup_history_count_to_keep;
- p_entry.start_time = cur_policy.start_time.to_string();
- p_entry.is_disable = cur_policy.is_disable;
- response.policys.emplace_back(p_entry);
- // acquire backup_infos
- std::vector<backup_info> b_infos =
- policy_context_ptr->get_backup_infos(request.backup_info_count);
- std::vector<backup_entry> b_entries;
- for (const auto &b_info : b_infos) {
- backup_entry b_entry;
- b_entry.backup_id = b_info.backup_id;
- b_entry.start_time_ms = b_info.start_time_ms;
- b_entry.end_time_ms = b_info.end_time_ms;
- b_entry.app_ids = b_info.app_ids;
- b_entries.emplace_back(b_entry);
- }
- response.backup_infos.emplace_back(std::move(b_entries));
- // policy_context_ptr.reset();
- }
- if (response.policys.empty()) {
- // have not pass a valid policy_name
- if (!policy_names.empty()) {
- response.err = ERR_INVALID_PARAMETERS;
- }
- }
-
- if (!response.hint_msg.empty()) {
- response.__isset.hint_msg = true;
- }
-}
-
-void backup_service::modify_backup_policy(configuration_modify_backup_policy_rpc rpc)
-{
- const configuration_modify_backup_policy_request &request = rpc.request();
- configuration_modify_backup_policy_response &response = rpc.response();
- response.err = ERR_OK;
-
- auto log_on_failed = dsn::defer([&response]() {
- if (!response.hint_message.empty()) {
- LOG_WARNING(response.hint_message);
- }
- });
-
- std::shared_ptr<policy_context> context_ptr;
- {
- zauto_lock _(_lock);
- auto iter = _policy_states.find(request.policy_name);
- if (iter == _policy_states.end()) {
- response.err = ERR_INVALID_PARAMETERS;
- context_ptr = nullptr;
- } else {
- context_ptr = iter->second;
- }
- }
- if (context_ptr == nullptr) {
- return;
- }
- policy cur_policy = context_ptr->get_policy();
-
- bool is_under_backup = context_ptr->is_under_backuping();
- bool have_modify_policy = false;
- std::vector<int32_t> valid_app_ids_to_add;
- std::map<int32_t, std::string> id_to_app_names;
- if (request.__isset.add_appids) {
- // lock the _lock of server_state to acquire verify the apps that added to policy
- zauto_read_lock l;
- _state->lock_read(l);
-
- for (const auto &appid : request.add_appids) {
- const auto &app = _state->get_app(appid);
- auto access_controller = _meta_svc->get_access_controller();
- // TODO: if app is dropped, how to process
- if (app == nullptr) {
- LOG_WARNING("{}: add app to policy failed, because invalid app({}), ignore it",
- cur_policy.policy_name,
- appid);
- continue;
- }
- if (access_controller->is_enable_ranger_acl() &&
- !access_controller->allowed(rpc.dsn_request(), app->app_name)) {
- LOG_WARNING("not authorized to modify backup policy({}) for app id: {}, skip it",
- cur_policy.policy_name,
- appid);
- continue;
- }
- valid_app_ids_to_add.emplace_back(appid);
- id_to_app_names.insert(std::make_pair(appid, app->app_name));
- have_modify_policy = true;
- }
- }
-
- if (request.__isset.is_disable) {
- if (request.is_disable) {
- if (is_under_backup) {
- LOG_INFO("{}: policy is under backuping, not allow to disable",
- cur_policy.policy_name);
- response.err = ERR_BUSY;
- } else if (!cur_policy.is_disable) {
- LOG_INFO("{}: policy is marked to disable", cur_policy.policy_name);
- cur_policy.is_disable = true;
- have_modify_policy = true;
- } else { // cur_policy.is_disable = true
- LOG_INFO("{}: policy is already disabled", cur_policy.policy_name);
- }
- } else {
- if (cur_policy.is_disable) {
- cur_policy.is_disable = false;
- LOG_INFO("{}: policy is marked to enable", cur_policy.policy_name);
- have_modify_policy = true;
- } else {
- LOG_INFO("{}: policy is already enabled", cur_policy.policy_name);
- response.err = ERR_OK;
- response.hint_message = std::string("policy is already enabled");
- }
- }
- }
-
- if (request.__isset.add_appids && !valid_app_ids_to_add.empty()) {
- for (const auto &appid : valid_app_ids_to_add) {
- cur_policy.app_ids.insert(appid);
- cur_policy.app_names.insert(std::make_pair(appid, id_to_app_names.at(appid)));
- have_modify_policy = true;
- }
- }
-
- if (request.__isset.removal_appids) {
- for (const auto &appid : request.removal_appids) {
- if (appid > 0) {
- cur_policy.app_ids.erase(appid);
- LOG_INFO("{}: remove app({}) to policy", cur_policy.policy_name, appid);
- have_modify_policy = true;
- } else {
- LOG_WARNING("{}: invalid app_id({})", cur_policy.policy_name, appid);
- }
- }
- }
-
- if (request.__isset.new_backup_interval_sec) {
- std::string hint_message;
- if (validate_backup_interval(request.new_backup_interval_sec, hint_message)) {
- LOG_INFO("{}: policy will change backup interval from {}s to {}s",
- cur_policy.policy_name,
- cur_policy.backup_interval_seconds,
- request.new_backup_interval_sec);
- cur_policy.backup_interval_seconds = request.new_backup_interval_sec;
- have_modify_policy = true;
- } else {
- LOG_WARNING("{}: invalid backup_interval_sec({}), {}",
- cur_policy.policy_name,
- request.new_backup_interval_sec,
- hint_message);
- }
- }
-
- if (request.__isset.backup_history_count_to_keep) {
- if (request.backup_history_count_to_keep > 0) {
- LOG_INFO("{}: policy will change backup_history_count_to_keep from {} to {}",
- cur_policy.policy_name,
- cur_policy.backup_history_count_to_keep,
- request.backup_history_count_to_keep);
- cur_policy.backup_history_count_to_keep = request.backup_history_count_to_keep;
- have_modify_policy = true;
- }
- }
-
- if (request.__isset.start_time) {
- backup_start_time t_start_time;
- if (t_start_time.parse_from(request.start_time)) {
- LOG_INFO("{}: policy change start_time from {} to {}",
- cur_policy.policy_name,
- cur_policy.start_time,
- t_start_time);
- cur_policy.start_time = t_start_time;
- have_modify_policy = true;
- }
- }
-
- if (have_modify_policy) {
- do_update_policy_to_remote_storage(rpc, cur_policy, context_ptr);
- }
-}
-
-std::string backup_service::get_policy_path(const std::string &policy_name)
-{
- std::stringstream ss;
- ss << _policy_meta_root << "/" << policy_name;
- return ss.str();
-}
-
-std::string backup_service::get_backup_path(const std::string &policy_name, int64_t backup_id)
-{
- std::stringstream ss;
- ss << _policy_meta_root << "/" << policy_name << "/" << backup_id;
- return ss.str();
-}
-
-void backup_service::start_backup_app(start_backup_app_rpc rpc)
-{
- const start_backup_app_request &request = rpc.request();
- start_backup_app_response &response = rpc.response();
- auto log_on_failed = dsn::defer([&response]() {
- if (!response.hint_message.empty()) {
- LOG_WARNING(response.hint_message);
- }
- });
-
- int32_t app_id = request.app_id;
- auto engine = std::make_shared<backup_engine>(this);
- error_code err = engine->init_backup(app_id);
- if (err != ERR_OK) {
- response.err = err;
- response.hint_message = fmt::format("Backup failed: invalid app id {}.", app_id);
- return;
- }
-
- err = engine->set_block_service(request.backup_provider_type);
- if (err != ERR_OK) {
- response.err = err;
- response.hint_message = fmt::format("Backup failed: invalid backup_provider_type {}.",
- request.backup_provider_type);
- return;
- }
-
- if (request.__isset.backup_path) {
- err = engine->set_backup_path(request.backup_path);
- if (err != ERR_OK) {
- response.err = err;
- response.hint_message = "Backup failed: the default backup path has already configured "
- "in `hdfs_service`, please modify the configuration if you "
- "want to use a specific backup path.";
- return;
- }
- }
-
- {
- zauto_lock l(_lock);
- for (const auto &backup : _backup_states) {
- if (app_id == backup->get_backup_app_id() && backup->is_in_progress()) {
- response.err = ERR_INVALID_STATE;
- response.hint_message =
- fmt::format("Backup failed: app {} is actively being backed up.", app_id);
- return;
- }
- }
- }
-
- err = engine->start();
- if (err == ERR_OK) {
- int64_t backup_id = engine->get_current_backup_id();
- {
- zauto_lock l(_lock);
- _backup_states.emplace_back(std::move(engine));
- }
- response.__isset.backup_id = true;
- response.backup_id = backup_id;
- response.hint_message =
- fmt::format("Backup succeed: metadata of app {} has been successfully backed up "
- "and backup request has been sent to replica servers.",
- app_id);
- } else {
- response.hint_message =
- fmt::format("Backup failed: could not backup metadata for app {}.", app_id);
- }
- response.err = err;
-}
-
-void backup_service::query_backup_status(query_backup_status_rpc rpc)
-{
- const query_backup_status_request &request = rpc.request();
- query_backup_status_response &response = rpc.response();
- auto log_on_failed = dsn::defer([&response]() {
- if (!response.hint_message.empty()) {
- LOG_WARNING(response.hint_message);
- }
- });
-
- int32_t app_id = request.app_id;
- {
- zauto_lock l(_lock);
- for (const auto &backup : _backup_states) {
- if (app_id == backup->get_backup_app_id() &&
- (!request.__isset.backup_id ||
- request.backup_id == backup->get_current_backup_id())) {
- response.backup_items.emplace_back(backup->get_backup_item());
- }
- }
- }
-
- if (response.backup_items.empty()) {
- response.err = ERR_INVALID_PARAMETERS;
- response.hint_message = "Backup not found, please check app_id or backup_id.";
- return;
- }
- response.__isset.backup_items = true;
- response.hint_message = fmt::format(
- "There are {} available backups for app {}.", response.backup_items.size(), app_id);
- response.err = ERR_OK;
-}
+// TODO(heyuchen): implement it
+void backup_service::query_backup_status(query_backup_status_rpc rpc) {}
} // namespace replication
} // namespace dsn
diff --git a/src/meta/meta_backup_service.h b/src/meta/meta_backup_service.h
index f767ad2..bf9b89f 100644
--- a/src/meta/meta_backup_service.h
+++ b/src/meta/meta_backup_service.h
@@ -17,433 +17,48 @@
#pragma once
-#include <gtest/gtest_prod.h>
-#include <algorithm>
-#include <atomic>
-#include <chrono>
-#include <cstdint>
-#include <cstdio>
-#include <functional>
-#include <iomanip> // std::setfill, std::setw
-#include <map>
-#include <memory>
-#include <set>
-#include <sstream>
#include <string>
-#include <vector>
-#include "backup_types.h"
-#include "common/gpid.h"
-#include "common/json_helper.h"
-#include "common/replication_other_types.h"
#include "meta_rpc_types.h"
-#include "runtime/task/task.h"
-#include "runtime/task/task_tracker.h"
-#include "utils/api_utilities.h"
-#include "utils/autoref_ptr.h"
-#include "utils/error_code.h"
-#include "utils/fmt_utils.h"
-#include "utils/metrics.h"
-#include "utils/ports.h"
-#include "utils/zlocks.h"
namespace dsn {
-class message_ex;
-class host_port;
-
-namespace dist {
-namespace block_service {
-class block_filesystem;
-} // namespace block_service
-} // namespace dist
namespace replication {
-class backup_engine;
-class backup_service;
class meta_service;
class server_state;
-struct backup_info_status
-{
- enum type
- {
- ALIVE = 1, // backup info is preserved
-
- DELETING = 2 // backup info is under deleting, should check whether backup checkpoint is
- // fully removed on backup media, then remove the backup_info on remote storage
- };
-};
-
-struct backup_info
-{
- int64_t backup_id;
- int64_t start_time_ms;
- int64_t end_time_ms;
-
- // "app_ids" is copied from policy.app_ids when
- // a new backup is generated. The policy's
- // app set may be changed, but backup_info.app_ids
- // never change.
- std::set<int32_t> app_ids;
- std::map<int32_t, std::string> app_names;
- int32_t info_status;
- backup_info_status::type get_backup_status() const
- {
- return backup_info_status::type(info_status);
- }
- backup_info()
- : backup_id(0), start_time_ms(0), end_time_ms(0), info_status(backup_info_status::ALIVE)
- {
- }
- DEFINE_JSON_SERIALIZATION(
- backup_id, start_time_ms, end_time_ms, app_ids, app_names, info_status)
-};
-
-// Attention: backup_start_time == 24:00 is represent no limit for start_time, 24:00 is mainly saved
-// for testing
-//
-// current, we don't support accurating to minute, only support accurating to hour, so
-// we just set minute to 0
-struct backup_start_time
-{
- int32_t hour; // [0 ~24)
- int32_t minute; // [0 ~ 60)
- backup_start_time() : hour(0), minute(0) {}
- backup_start_time(int32_t h, int32_t m) : hour(h), minute(m) {}
- std::string to_string() const
- {
- std::stringstream ss;
- ss << std::setw(2) << std::setfill('0') << std::to_string(hour) << ":" << std::setw(2)
- << std::setfill('0') << std::to_string(minute);
- return ss.str();
- }
- friend std::ostream &operator<<(std::ostream &os, const backup_start_time &t)
- {
- return os << t.to_string();
- }
- // NOTICE: this function will modify hour and minute, if time is invalid, this func will set
- // hour = 24, minute = 0
- bool parse_from(const std::string &time)
- {
- if (::sscanf(time.c_str(), "%d:%d", &hour, &minute) != 2) {
- return false;
- } else {
- if (hour > 24) {
- hour = 24;
- minute = 0;
- return false;
- }
-
- if (hour == 24 && minute != 0) {
- minute = 0;
- return false;
- }
-
- if (minute >= 60) {
- hour = 24;
- minute = 0;
- return false;
- }
- }
- return true;
- }
-
- // return the interval between new_hour:new_min and start_time,
- // namely new_hour:new_min - start_time;
- // unit is ms
- int64_t compute_time_drift_ms(int32_t new_hour, int32_t new_min)
- {
- int64_t res = 0;
- // unit is hour
- res += (new_hour - hour);
- // unit is minute
- res *= 60;
- res += (new_min - minute);
- // unit is ms
- return (res * 60 * 1000);
- }
-
- // judge whether we should start backup base current time
- bool should_start_backup(int32_t cur_hour, int32_t cur_min)
- {
- if (hour == 24) {
- // erase the restrict of backup_start_time, just for testing
- return true;
- }
- // NOTICE : if you want more precisely, you can use cur_min to implement
- // now, we just ignore
- return (cur_hour == hour);
- }
- DEFINE_JSON_SERIALIZATION(hour, minute)
-};
-
-class backup_policy_metrics
-{
-public:
- backup_policy_metrics() = default;
- backup_policy_metrics(const std::string &policy_name);
-
- const metric_entity_ptr &backup_policy_metric_entity() const;
-
- METRIC_DEFINE_SET(backup_recent_duration_ms, int64_t)
-
-private:
- const std::string _policy_name;
- const metric_entity_ptr _backup_policy_metric_entity;
- METRIC_VAR_DECLARE_gauge_int64(backup_recent_duration_ms);
-
- DISALLOW_COPY_AND_ASSIGN(backup_policy_metrics);
-};
-
-//
-// the backup process of meta server:
-// 1, write the app metadata to block filesystem
-// 2, tell the primary of each partition periodically to start backup until app finish backup
-// 3, receive the backup response from each primary to judge whether backup is finished
-// 4, if one app finish its backup, write a flag to block filesystem(we write a file named
-// app_backup_status to represent the flag) to represent it has finished backup
-// 5, if policy finished backup, write the backup information (backup_info) to block filesystem
-// 6, backup is finished, we just wait to start another backup
-//
-
-class policy : public policy_info
-{
-public:
- std::set<int32_t> app_ids;
- std::map<int32_t, std::string> app_names;
- int64_t backup_interval_seconds;
- int32_t backup_history_count_to_keep;
- bool is_disable;
- backup_start_time start_time;
-
- policy()
- : app_ids(),
- backup_interval_seconds(0),
- backup_history_count_to_keep(6),
- is_disable(false),
- start_time(24, 0) // default is 24:00, namely no limit
- {
- }
-
- DEFINE_JSON_SERIALIZATION(policy_name,
- backup_provider_type,
- app_ids,
- app_names,
- backup_interval_seconds,
- backup_history_count_to_keep,
- is_disable,
- start_time)
-};
-
-struct backup_progress
-{
- int32_t unfinished_apps;
- std::map<gpid, int32_t> partition_progress;
- std::map<app_id, int32_t> unfinished_partitions_per_app;
- // <app_id, <partition_id, checkpoint size>>
- std::map<app_id, std::map<int, int64_t>> app_chkpt_size;
- // if app is dropped when starting a new backup or under backuping, we just skip backup this app
- std::map<app_id, bool> is_app_skipped;
-
- backup_progress() : unfinished_apps(0) {}
-
- void reset()
- {
- unfinished_apps = 0;
- partition_progress.clear();
- unfinished_partitions_per_app.clear();
- app_chkpt_size.clear();
- is_app_skipped.clear();
- }
-};
-
-struct backup_flag
-{
- int64_t total_checkpoint_size;
- DEFINE_JSON_SERIALIZATION(total_checkpoint_size)
-};
-
-class policy_context
-{
-public:
- explicit policy_context(backup_service *service)
- : _backup_service(service), _block_service(nullptr)
- {
- }
- mock_virtual ~policy_context() {}
-
- void set_policy(const policy &p);
- policy get_policy();
- void add_backup_history(const backup_info &info);
- std::vector<backup_info> get_backup_infos(int cnt);
- bool is_under_backuping();
- mock_virtual void start();
- // function above will called be others, before call these function, should lock the _lock of
- // policy_context, otherwise maybe lead deadlock
-
- // clang-format off
-mock_private :
- //
- // update the partition progress
- // the progress for app and whole-backup-instance will also updated accordingly.
- // if whole-backup-instance is finished, sync it to the remote storage.
- // NOTICE: the local "_cur_backup" is reset after it is successfully synced to remote,
- // which is in another task.
- // so user can safely visit "_cur_backup" after this function call,
- // as long as the _lock is held.
- //
- // Return: true if the partition is finished, or-else false
- //
-
- mock_virtual bool
- update_partition_progress_unlocked(gpid pid, int32_t progress, const host_port &source);
- mock_virtual void record_partition_checkpoint_size_unlock(const gpid& pid, int64_t size);
-
- mock_virtual void start_backup_app_meta_unlocked(int32_t app_id);
- mock_virtual void start_backup_app_partitions_unlocked(int32_t app_id);
- mock_virtual void start_backup_partition_unlocked(gpid pid);
- // before finish backup one app, we write a flag file to represent whether the app's backup is
- // finished
- mock_virtual void write_backup_app_finish_flag_unlocked(int32_t app_id,
- dsn::task_ptr write_callback);
- mock_virtual void finish_backup_app_unlocked(int32_t app_id);
- // after finish backup all app, we record the information of policy's backup to block filesystem
- mock_virtual void write_backup_info_unlocked(const backup_info &b_info,
- dsn::task_ptr write_callback);
-
- mock_virtual void sync_backup_to_remote_storage_unlocked(const backup_info &b_info,
- dsn::task_ptr sync_callback,
- bool create_new_node);
- mock_virtual void initialize_backup_progress_unlocked();
- mock_virtual void prepare_current_backup_on_new_unlocked();
- mock_virtual void issue_new_backup_unlocked();
- // returns:
- // - true, should start backup right now, otherwise don't start backup
- mock_virtual bool should_start_backup_unlocked();
- mock_virtual void continue_current_backup_unlocked();
-
- mock_virtual void on_backup_reply(dsn::error_code err,
- backup_response &&response,
- gpid pid,
- const host_port &primary);
-
- mock_virtual void gc_backup_info_unlocked(const backup_info &info_to_gc);
- mock_virtual void issue_gc_backup_info_task_unlocked();
- mock_virtual void sync_remove_backup_info(const backup_info &info, dsn::task_ptr sync_callback);
-
-mock_private :
- friend class backup_service;
- backup_service *_backup_service;
-
- // lock the data-structure below
- dsn::zlock _lock;
-
- // policy related
- policy _policy;
- dist::block_service::block_filesystem *_block_service;
-
- // backup related
- backup_info _cur_backup;
- bool _is_backup_failed;
- // backup_id --> backup_info
- std::map<int64_t, backup_info> _backup_history;
- backup_progress _progress;
- std::string _backup_sig; // policy_name@backup_id, used when print backup related log
-
- std::unique_ptr<backup_policy_metrics> _metrics;
-
- //clang-format on
- dsn::task_tracker _tracker;
-};
-
+// TODO(heyuchen): implement it
class backup_service
{
public:
- struct backup_opt
- {
- std::chrono::milliseconds meta_retry_delay_ms;
- std::chrono::milliseconds block_retry_delay_ms;
- std::chrono::milliseconds app_dropped_retry_delay_ms;
- std::chrono::milliseconds reconfiguration_retry_delay_ms;
- std::chrono::milliseconds request_backup_period_ms; // period that meta send backup command to replica
- std::chrono::milliseconds issue_backup_interval_ms; // interval that meta try to issue a new backup
- };
-
- typedef std::function<std::shared_ptr<policy_context>(backup_service *)> policy_factory;
explicit backup_service(meta_service *meta_svc,
const std::string &policy_meta_root,
- const std::string &backup_root,
- const policy_factory &factory);
- meta_service *get_meta_service() const { return _meta_svc; }
- server_state *get_state() const { return _state; }
- backup_opt &backup_option() { return _opt; }
+ const std::string &backup_root);
void start();
- const std::string &backup_root() const { return _backup_root; }
- const std::string &policy_root() const { return _policy_meta_root; }
- void add_backup_policy(dsn::message_ex* msg);
- void query_backup_policy(query_backup_policy_rpc rpc);
- void modify_backup_policy(configuration_modify_backup_policy_rpc rpc);
void start_backup_app(start_backup_app_rpc rpc);
void query_backup_status(query_backup_status_rpc rpc);
- // compose the absolute path(AP) for policy
- // input:
- // -- root: the prefix of the AP
- // return:
- // the AP of this policy: <policy_meta_root>/<policy_name>
- std::string get_policy_path(const std::string &policy_name);
- // compose the absolute path(AP) for backup
- // input:
- // -- root: the prefix of the AP
- // return:
- // the AP of this backup: <policy_meta_root>/<policy_name>/<backup_id>
- std::string get_backup_path(const std::string &policy_name, int64_t backup_id);
+ meta_service *get_meta_service() const { return _meta_svc; }
+ server_state *get_state() const { return _state; }
+
+ const std::string &backup_root() const { return _backup_root; }
+ const std::string &policy_root() const { return _policy_meta_root; }
private:
- friend class backup_service_test;
- friend class meta_service_test_app;
+ friend class backup_engine;
- FRIEND_TEST(backup_service_test, test_init_backup);
- FRIEND_TEST(backup_service_test, test_query_backup_status);
- FRIEND_TEST(meta_backup_service_test, test_add_backup_policy);
-
- void start_create_policy_meta_root(dsn::task_ptr callback);
- void start_sync_policies();
- error_code sync_policies_from_remote_storage();
-
- void do_add_policy(dsn::message_ex* req,
- std::shared_ptr<policy_context> p,
- const std::string &hint_msg);
- void do_update_policy_to_remote_storage(configuration_modify_backup_policy_rpc rpc,
- const policy &p,
- std::shared_ptr<policy_context> &p_context_ptr);
-
- bool is_valid_policy_name_unlocked(const std::string &policy_name);
-
- policy_factory _factory;
meta_service *_meta_svc;
server_state *_state;
- // lock _policy_states and _backup_states.
- zlock _lock;
- std::map<std::string, std::shared_ptr<policy_context>>
- _policy_states; // policy_name -> policy_context
-
- // _backup_states stores all states of one-time backup in the cluster, not persistence to ZK
- std::vector<std::shared_ptr<backup_engine>> _backup_states;
-
// the root of policy metas, stored on remote_storage(zookeeper)
std::string _policy_meta_root;
// the root of cold backup data, stored on block service
std::string _backup_root;
-
- backup_opt _opt;
- std::atomic_bool _in_initialize;
- dsn::task_tracker _tracker;
};
+
} // namespace replication
} // namespace dsn
-USER_DEFINED_STRUCTURE_FORMATTER(::dsn::replication::backup_start_time);
+// USER_DEFINED_STRUCTURE_FORMATTER(::dsn::replication::backup_start_time);
diff --git a/src/meta/meta_http_service.cpp b/src/meta/meta_http_service.cpp
index ba5bc9b..3e343b7 100644
--- a/src/meta/meta_http_service.cpp
+++ b/src/meta/meta_http_service.cpp
@@ -26,7 +26,6 @@
#include <unordered_map>
#include <utility>
-#include "backup_types.h"
#include "bulk_load_types.h"
#include "common//duplication_common.h"
#include "common/bulk_load_common.h"
@@ -39,9 +38,7 @@
#include "duplication_types.h"
#include "http/http_status_code.h"
#include "meta/duplication/meta_duplication_service.h"
-#include "meta/meta_backup_service.h"
#include "meta/meta_bulk_load_service.h"
-#include "meta/meta_rpc_types.h"
#include "meta/meta_service.h"
#include "meta_admin_types.h"
#include "meta_http_service.h"
@@ -553,54 +550,8 @@
return out.str();
}
-void meta_http_service::query_backup_policy_handler(const http_request &req, http_response &resp)
-{
- if (!redirect_if_not_primary(req, resp))
- return;
-
- if (_service->_backup_handler == nullptr) {
- resp.body = "cold_backup_disabled";
- resp.status_code = http_status_code::kNotFound;
- return;
- }
- auto request = std::make_unique<configuration_query_backup_policy_request>();
- std::vector<std::string> policy_names;
- for (const auto &p : req.query_args) {
- if (p.first == "name") {
- policy_names.push_back(p.second);
- } else {
- resp.body = "Invalid parameter";
- resp.status_code = http_status_code::kBadRequest;
- return;
- }
- }
- request->policy_names = std::move(policy_names);
- query_backup_policy_rpc http_to_rpc(std::move(request), LPC_DEFAULT_CALLBACK);
- _service->_backup_handler->query_backup_policy(http_to_rpc);
- auto rpc_return = http_to_rpc.response();
-
- dsn::utils::table_printer tp_query_backup_policy;
- tp_query_backup_policy.add_title("name");
- tp_query_backup_policy.add_column("backup_provider_type");
- tp_query_backup_policy.add_column("backup_interval");
- tp_query_backup_policy.add_column("app_ids");
- tp_query_backup_policy.add_column("start_time");
- tp_query_backup_policy.add_column("status");
- tp_query_backup_policy.add_column("backup_history_count");
- for (const auto &cur_policy : rpc_return.policys) {
- tp_query_backup_policy.add_row(cur_policy.policy_name);
- tp_query_backup_policy.append_data(cur_policy.backup_provider_type);
- tp_query_backup_policy.append_data(cur_policy.backup_interval_seconds);
- tp_query_backup_policy.append_data(set_to_string(cur_policy.app_ids));
- tp_query_backup_policy.append_data(cur_policy.start_time);
- tp_query_backup_policy.append_data(cur_policy.is_disable ? "disabled" : "enabled");
- tp_query_backup_policy.append_data(cur_policy.backup_history_count_to_keep);
- }
- std::ostringstream out;
- tp_query_backup_policy.output(out, dsn::utils::table_printer::output_format::kJsonCompact);
- resp.body = out.str();
- resp.status_code = http_status_code::kOk;
-}
+// TODO(heyuchen): implement it
+void meta_http_service::query_backup_policy_handler(const http_request &req, http_response &resp) {}
void meta_http_service::query_duplication_handler(const http_request &req, http_response &resp)
{
diff --git a/src/meta/meta_service.cpp b/src/meta/meta_service.cpp
index 8c2b38b..813122e 100644
--- a/src/meta/meta_service.cpp
+++ b/src/meta/meta_service.cpp
@@ -440,8 +440,7 @@
_backup_handler = std::make_shared<backup_service>(
this,
utils::filesystem::concat_path_unix_style(_cluster_root, "backup"),
- FLAGS_cold_backup_root,
- [](backup_service *bs) { return std::make_shared<policy_context>(bs); });
+ FLAGS_cold_backup_root);
}
_bulk_load_svc = std::make_unique<bulk_load_service>(
@@ -902,59 +901,14 @@
LPC_RESTORE_BACKGROUND, nullptr, std::bind(&server_state::restore_app, _state.get(), req));
}
-void meta_service::on_add_backup_policy(dsn::message_ex *req)
-{
- configuration_add_backup_policy_response response;
- if (!check_status_and_authz_with_reply(req, response)) {
- return;
- }
+// TODO(heyuchen): implement it
+void meta_service::on_add_backup_policy(dsn::message_ex *req) {}
- if (_backup_handler == nullptr) {
- LOG_ERROR("meta doesn't enable backup service");
- response.err = ERR_SERVICE_NOT_ACTIVE;
- reply(req, response);
- } else {
- req->add_ref();
- tasking::enqueue(LPC_DEFAULT_CALLBACK,
- nullptr,
- std::bind(&backup_service::add_backup_policy, _backup_handler.get(), req));
- }
-}
+// TODO(heyuchen): implement it
+void meta_service::on_query_backup_policy(query_backup_policy_rpc policy_rpc) {}
-void meta_service::on_query_backup_policy(query_backup_policy_rpc policy_rpc)
-{
- if (!check_status_and_authz(policy_rpc)) {
- return;
- }
-
- auto &response = policy_rpc.response();
- if (_backup_handler == nullptr) {
- LOG_ERROR("meta doesn't enable backup service");
- response.err = ERR_SERVICE_NOT_ACTIVE;
- } else {
- tasking::enqueue(
- LPC_DEFAULT_CALLBACK,
- nullptr,
- std::bind(&backup_service::query_backup_policy, _backup_handler.get(), policy_rpc));
- }
-}
-
-void meta_service::on_modify_backup_policy(configuration_modify_backup_policy_rpc rpc)
-{
- if (!check_status_and_authz(rpc)) {
- return;
- }
-
- if (_backup_handler == nullptr) {
- LOG_ERROR("meta doesn't enable backup service");
- rpc.response().err = ERR_SERVICE_NOT_ACTIVE;
- } else {
- tasking::enqueue(
- LPC_DEFAULT_CALLBACK,
- nullptr,
- std::bind(&backup_service::modify_backup_policy, _backup_handler.get(), rpc));
- }
-}
+// TODO(heyuchen): implement it
+void meta_service::on_modify_backup_policy(configuration_modify_backup_policy_rpc rpc) {}
void meta_service::on_report_restore_status(configuration_report_restore_status_rpc rpc)
{
@@ -1256,27 +1210,11 @@
server_state::sStateHash);
}
-void meta_service::on_start_backup_app(start_backup_app_rpc rpc)
-{
- CHECK_APP_ID_STATUS_AND_AUTHZ(rpc.request().app_id);
- if (_backup_handler == nullptr) {
- LOG_ERROR("meta doesn't enable backup service");
- rpc.response().err = ERR_SERVICE_NOT_ACTIVE;
- return;
- }
- _backup_handler->start_backup_app(std::move(rpc));
-}
+// TODO(heyuchen): implement it
+void meta_service::on_start_backup_app(start_backup_app_rpc rpc) {}
-void meta_service::on_query_backup_status(query_backup_status_rpc rpc)
-{
- CHECK_APP_ID_STATUS_AND_AUTHZ(rpc.request().app_id);
- if (_backup_handler == nullptr) {
- LOG_ERROR("meta doesn't enable backup service");
- rpc.response().err = ERR_SERVICE_NOT_ACTIVE;
- return;
- }
- _backup_handler->query_backup_status(std::move(rpc));
-}
+// TODO(heyuchen): implement it
+void meta_service::on_query_backup_status(query_backup_status_rpc rpc) {}
size_t meta_service::get_alive_node_count() const
{
diff --git a/src/meta/meta_service.h b/src/meta/meta_service.h
index 1db53b8..85b24aa 100644
--- a/src/meta/meta_service.h
+++ b/src/meta/meta_service.h
@@ -319,11 +319,12 @@
bool check_freeze() const;
private:
- friend class backup_engine_test;
- friend class backup_service_test;
+ // TODO(heyuchen): update it
+ // friend class backup_engine_test;
+ // friend class backup_service_test;
friend class bulk_load_service_test;
- friend class meta_backup_service_test;
- friend class meta_backup_test_base;
+ // friend class meta_backup_service_test;
+ // friend class meta_backup_test_base;
friend class meta_duplication_service;
friend class meta_http_service;
friend class meta_http_service_test;
diff --git a/src/meta/test/backup_test.cpp b/src/meta/test/backup_test.cpp
deleted file mode 100644
index 21af00e..0000000
--- a/src/meta/test/backup_test.cpp
+++ /dev/null
@@ -1,884 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <fmt/core.h>
-#include <unistd.h>
-#include <atomic>
-#include <chrono>
-#include <cstdint>
-#include <iostream>
-#include <map>
-#include <memory>
-#include <set>
-#include <string>
-#include <thread>
-#include <utility>
-#include <vector>
-
-#include "backup_types.h"
-#include "common/gpid.h"
-#include "common/replication.codes.h"
-#include "dsn.layer2_types.h"
-#include "gtest/gtest.h"
-#include "meta/meta_backup_service.h"
-#include "meta/meta_data.h"
-#include "meta/meta_service.h"
-#include "meta/meta_state_service.h"
-#include "meta/server_state.h"
-#include "meta/test/misc/misc.h"
-#include "meta_service_test_app.h"
-#include "meta_test_base.h"
-#include "runtime/api_layer1.h"
-#include "runtime/rpc/rpc_address.h"
-#include "runtime/rpc/rpc_holder.h"
-#include "runtime/rpc/rpc_host_port.h"
-#include "runtime/rpc/rpc_message.h"
-#include "runtime/rpc/serialization.h"
-#include "runtime/task/async_calls.h"
-#include "runtime/task/task.h"
-#include "runtime/task/task_code.h"
-#include "utils/autoref_ptr.h"
-#include "utils/chrono_literals.h"
-#include "utils/error_code.h"
-#include "utils/fail_point.h"
-#include "utils/flags.h"
-#include "utils/fmt_logging.h"
-#include "utils/synchronize.h"
-#include "utils/time_utils.h"
-#include "utils/zlocks.h"
-
-DSN_DECLARE_int32(cold_backup_checkpoint_reserve_minutes);
-DSN_DECLARE_string(cluster_root);
-DSN_DECLARE_string(meta_state_service_type);
-
-namespace dsn {
-namespace replication {
-class meta_options;
-class mock_policy;
-
-struct method_record
-{
- dsn::utils::notify_event event;
- int count;
- int max_call_count;
- // whether the event will be triggered when count==max_call_count
- bool trigger_beyond;
-
- method_record() : event(), count(0), max_call_count(1000000), trigger_beyond(true) {}
-};
-
-class mock_base
-{
-public:
- void reset_records() { _records.clear(); }
-
-protected:
- std::map<std::string, method_record> _records;
-};
-
-#define MOCK_ADD_RECORD(records, method_name) records[#method_name] = method_record()
-#define MOCK_HELPER_FUNCS(method_name) \
- int &counter_##method_name() { return _records[#method_name].count; } \
- dsn::utils::notify_event ¬ifier_##method_name() { return _records[#method_name].event; } \
- int maxcall_##method_name() { return _records[#method_name].max_call_count; } \
- void set_maxcall_##method_name(int callcount) \
- { \
- _records[#method_name].max_call_count = callcount; \
- } \
- bool &trigger_beyond_##method_name() { return _records[#method_name].trigger_beyond; }
-
-#define DEFINE_MOCK0(base_class, method_name) \
- MOCK_HELPER_FUNCS(method_name) \
- void method_name() \
- { \
- LOG_INFO("{} is called", #method_name); \
- int &c = counter_##method_name(); \
- ++c; \
- int max_call = maxcall_##method_name(); \
- if (c <= max_call) { \
- base_class::method_name(); \
- } \
- if (c > max_call || (c == max_call && !trigger_beyond_##method_name())) { \
- notifier_##method_name().notify(); \
- } \
- }
-
-#define DEFINE_MOCK1(base_class, method_name, type1) \
- MOCK_HELPER_FUNCS(method_name) \
- void method_name(type1 arg1) \
- { \
- LOG_INFO("{} is called", #method_name); \
- int &c = counter_##method_name(); \
- ++c; \
- int max_call = maxcall_##method_name(); \
- if (c <= max_call) { \
- base_class::method_name(arg1); \
- } \
- if (c > max_call || (c == max_call && !trigger_beyond_##method_name())) { \
- notifier_##method_name().notify(); \
- } \
- }
-
-#define DEFINE_MOCK2(base_class, method_name, type1, type2) \
- MOCK_HELPER_FUNCS(method_name) \
- void method_name(type1 arg1, type2 arg2) \
- { \
- LOG_INFO("{} is called", #method_name); \
- int &c = counter_##method_name(); \
- ++c; \
- int max_call = maxcall_##method_name(); \
- if (c <= max_call) { \
- base_class::method_name(arg1, arg2); \
- } \
- if (c > max_call || (c == max_call && !trigger_beyond_##method_name())) { \
- notifier_##method_name().notify(); \
- } \
- }
-
-#define DEFINE_MOCK3(base_class, method_name, type1, type2, type3) \
- MOCK_HELPER_FUNCS(method_name) \
- void method_name(type1 arg1, type2 arg2, type3, arg3) \
- { \
- LOG_INFO("{} is called", #method_name); \
- int &c = counter_##method_name(); \
- ++c; \
- int max_call = maxcall_##method_name(); \
- if (c <= max_call) { \
- base_class::method_name(arg1, arg2, arg3); \
- } \
- if (c > max_call || (c == max_call && !trigger_beyond_##method_name())) { \
- notifier_##method_name().notify(); \
- } \
- }
-
-class mock_policy : public policy_context, public mock_base
-{
-public:
- mock_policy(backup_service *bs) : policy_context(bs) {}
- DEFINE_MOCK0(policy_context, issue_new_backup_unlocked)
- DEFINE_MOCK0(policy_context, continue_current_backup_unlocked)
- DEFINE_MOCK1(policy_context, start_backup_app_meta_unlocked, int32_t)
- DEFINE_MOCK1(policy_context, finish_backup_app_unlocked, int32_t)
- DEFINE_MOCK2(policy_context, write_backup_app_finish_flag_unlocked, int32_t, dsn::task_ptr)
-
- MOCK_HELPER_FUNCS(start)
- void start()
- {
- ++counter_start();
- notifier_start().notify();
- }
-};
-
-class progress_liar : public meta_service
-{
-public:
- // req is held by callback, we don't need to handle the life-time of it
- virtual void send_request(dsn::message_ex *req,
- const host_port &target,
- const rpc_response_task_ptr &callback)
- {
- // need to handle life-time manually
- dsn::message_ex *recved_req = create_corresponding_receive(req);
-
- backup_request b_req;
- dsn::unmarshall(recved_req, b_req);
-
- backup_response b_resp;
- b_resp.backup_id = b_req.backup_id;
- b_resp.err = dsn::ERR_OK;
- b_resp.pid = b_req.pid;
- b_resp.policy_name = b_req.policy.policy_name;
- b_resp.progress = check_progress(b_req.pid);
-
- // need to handle life-time manually
- dsn::message_ex *response_for_send = recved_req->create_response();
- dsn::marshall(response_for_send, b_resp);
-
- // life time is handled by callback
- dsn::message_ex *response_for_receive = create_corresponding_receive(response_for_send);
- callback->enqueue(dsn::ERR_OK, (dsn::message_ex *)response_for_receive);
-
- destroy_message(recved_req);
- destroy_message(response_for_send);
- }
-
- int32_t check_progress(const gpid &pid)
- {
- if (progress.find(pid) == progress.end()) {
- progress[pid] = 500;
- } else if (progress[pid] == 500) {
- progress[pid] = 250;
- } else if (progress[pid] == 250) {
- progress[pid] = 1000;
- }
-
- return progress[pid];
- }
-
-private:
- std::map<gpid, int32_t> progress;
-};
-
-static const std::string test_policy_name = "test_policy_name";
-
-class policy_context_test : public meta_test_base
-{
-protected:
- policy_context_test() : _service(new progress_liar()), _mp(nullptr) {}
-
- void SetUp() override
- {
- meta_test_base::SetUp();
-
- dsn::error_code ec = _service->remote_storage_initialize();
- ASSERT_EQ(ec, dsn::ERR_OK);
- _service->_started = true;
- _service->_backup_handler =
- std::make_shared<backup_service>(_service.get(), policy_root, ".", nullptr);
- _service->_backup_handler->backup_option().app_dropped_retry_delay_ms = 500_ms;
- _service->_backup_handler->backup_option().request_backup_period_ms = 20_ms;
- _service->_backup_handler->backup_option().issue_backup_interval_ms = 1000_ms;
- _service->_storage
- ->create_node(
- policy_root, dsn::TASK_CODE_EXEC_INLINED, [&ec](dsn::error_code err) { ec = err; })
- ->wait();
- ASSERT_EQ(dsn::ERR_OK, ec);
-
- _policy.policy_name = test_policy_name;
- _policy.is_disable = false;
- _policy.backup_interval_seconds = 5;
- _policy.backup_provider_type = "local_service";
- _policy.start_time = backup_start_time(24, 0);
- _policy.app_ids = {1, 2, 3, 4, 6};
- _policy.app_names[1] = "app1";
- _policy.app_names[2] = "app2";
- _policy.app_names[3] = "app3";
- _policy.app_names[4] = "app4";
- _policy.app_names[6] = "app6";
- _mp._backup_service = _service->_backup_handler.get();
- _mp.set_policy(_policy);
-
- _service->_storage
- ->create_node(
- policy_dir, dsn::TASK_CODE_EXEC_INLINED, [&ec](dsn::error_code err) { ec = err; })
- ->wait();
- ASSERT_EQ(dsn::ERR_OK, ec);
- }
-
- const std::string policy_root = "/test";
- const std::string policy_dir = "/test/" + test_policy_name;
-
- std::shared_ptr<meta_service> _service;
- mock_policy _mp;
- policy _policy;
-};
-
-TEST_F(policy_context_test, test_app_dropped_during_backup)
-{
- int64_t time_before_backup = static_cast<int64_t>(dsn_now_ms());
- server_state *state = _service->get_server_state();
-
- {
- // Prepare: backup_history is empty, all apps are deleted.
- // Result: we can't get continue-curr called, issue will be recalled again
- std::cout << "issue a backup, but no app is available" << std::endl;
-
- {
- zauto_lock l(_mp._lock);
- _mp.set_maxcall_issue_new_backup_unlocked(2);
- _mp.issue_new_backup_unlocked();
- }
-
- ASSERT_TRUE(_mp.notifier_issue_new_backup_unlocked().wait_for(5000));
-
- {
- zauto_lock l(_mp._lock);
- ASSERT_EQ(0, _mp.counter_continue_current_backup_unlocked());
- ASSERT_LE(time_before_backup, _mp._cur_backup.backup_id);
- ASSERT_EQ(_policy.app_ids, _mp._cur_backup.app_ids);
- ASSERT_NE(0, _mp._cur_backup.start_time_ms);
- ASSERT_TRUE(_mp._progress.unfinished_partitions_per_app.empty());
- ASSERT_EQ(_policy.app_ids.size(), _mp._progress.unfinished_apps);
- ASSERT_LE(test_policy_name + std::string("@") + std::to_string(time_before_backup),
- _mp._backup_sig);
- }
- }
-
- {
- // Prepare: backup_history is empty
- // not all apps are deleted.
- // Result: we can get continue-curr called
- std::cout << "issue a new backup without backup histories" << std::endl;
- dsn::app_info info;
- info.is_stateful = true;
- info.app_id = 3;
- info.app_type = "simple_kv";
- info.max_replica_count = 3;
- info.partition_count = 32;
- info.status = dsn::app_status::AS_AVAILABLE;
- state->_all_apps.emplace(info.app_id, app_state::create(info));
-
- {
- zauto_lock l(_mp._lock);
- _mp.reset_records();
- _mp.set_maxcall_continue_current_backup_unlocked(0);
- _mp.issue_new_backup_unlocked();
- }
-
- ASSERT_TRUE(_mp.notifier_continue_current_backup_unlocked().wait_for(5000));
-
- {
- zauto_lock l(_mp._lock);
- ASSERT_EQ(_policy.app_ids.size(), _mp._progress.unfinished_apps);
- ASSERT_EQ(1, _mp._progress.unfinished_partitions_per_app.size());
- ASSERT_EQ(info.app_id, _mp._progress.unfinished_partitions_per_app.begin()->first);
- ASSERT_EQ(info.partition_count,
- _mp._progress.unfinished_partitions_per_app.begin()->second);
- ASSERT_EQ(info.partition_count, _mp._progress.partition_progress.size());
- }
- }
-
- {
- // test cases
- // Prepare: backup_history isn't empty,
- // all apps are unavailable,
- // we will reach next backup time 500ms later
- // Result: issue called 3 times
- std::cout << "issue a new backup later" << std::endl;
-
- backup_info info;
- info.app_ids = {1, 2, 3};
-
- info.start_time_ms = dsn_now_ms() - (_policy.backup_interval_seconds + 20) * 1000 - 500;
- info.end_time_ms = info.start_time_ms + 10;
- info.backup_id = info.start_time_ms;
- _mp.add_backup_history(info);
-
- info.start_time_ms += 10000;
- info.end_time_ms += 10000;
- info.backup_id = info.start_time_ms;
- _mp.add_backup_history(info);
-
- // the start time for recent backup is 500ms ago
- info.start_time_ms += 10000;
- info.end_time_ms += 10000;
- info.backup_id = info.start_time_ms;
- _mp.add_backup_history(info);
-
- {
- zauto_lock l(_mp._lock);
- _mp.reset_records();
- // issue by test -> issue by period delay -> issue by dropped retry ->
- // issue by dropped retry
- _mp.set_maxcall_issue_new_backup_unlocked(4);
- state->_all_apps[3]->status = dsn::app_status::AS_DROPPED;
-
- _mp.issue_new_backup_unlocked();
- }
- // we mark all apps as dropped, so reissue will be triggered
- ASSERT_TRUE(_mp.notifier_issue_new_backup_unlocked().wait_for(20000));
-
- {
- int64_t start_time_ms_of_sixth_backup =
- info.start_time_ms + _policy.backup_interval_seconds * 1000;
- zauto_lock l(_mp._lock);
- ASSERT_LE(start_time_ms_of_sixth_backup, _mp._cur_backup.backup_id);
- ASSERT_EQ(_policy.app_ids, _mp._cur_backup.app_ids);
-
- // every time intialize backup, the progress will be reset
- ASSERT_TRUE(_mp._progress.unfinished_partitions_per_app.empty());
- ASSERT_TRUE(_mp._progress.partition_progress.empty());
- ASSERT_EQ(_policy.app_ids.size(), _mp._progress.unfinished_apps);
- ASSERT_LE(test_policy_name + "@" + std::to_string(start_time_ms_of_sixth_backup),
- _mp._backup_sig);
- }
- }
-
- {
- // test case: continue current backup unlocked
- // Prepare: app 3 is available
- // clear the backup list
- // call continue_current_backup_unlocked.
- // Result: app {1, 2, 4, 6} will treat as finished, both finish_backup_app_unlocked
- // and write_backup_app_finish_flag_unlocked will be called 4 times.
- // start_backup_app_meta is called for app 3, only called once,
- // as app 3 won't be finished, so the backup can't finish
- std::cout << "continue backup, only some apps are available " << std::endl;
- {
- zauto_lock l(_mp._lock);
- _mp._backup_history.clear();
- _mp.reset_records();
- _mp.set_maxcall_start_backup_app_meta_unlocked(0);
-
- _mp.set_maxcall_finish_backup_app_unlocked(4);
- _mp.trigger_beyond_finish_backup_app_unlocked() = false;
- _mp.set_maxcall_write_backup_app_finish_flag_unlocked(4);
- _mp.trigger_beyond_write_backup_app_finish_flag_unlocked() = false;
-
- state->_all_apps[3]->status = dsn::app_status::AS_AVAILABLE;
- _mp.issue_new_backup_unlocked();
- }
-
- ASSERT_TRUE(_mp.notifier_start_backup_app_meta_unlocked().wait_for(10000));
- ASSERT_TRUE(_mp.notifier_finish_backup_app_unlocked().wait_for(10000));
- ASSERT_TRUE(_mp.notifier_write_backup_app_finish_flag_unlocked().wait_for(10000));
-
- {
- zauto_lock l(_mp._lock);
- ASSERT_EQ(1, _mp.counter_start_backup_app_meta_unlocked());
- ASSERT_EQ(4, _mp.counter_finish_backup_app_unlocked());
- }
- }
-
- {
- // test case: app is dropped when start backup meta
- // Prepare: prepare the current backup, then mark the app as dropped
- // Result: all apps will be marked as finished, new backup will be issued
- std::cout << "app is dropped when start to backup meta" << std::endl;
- app_state *app = state->_all_apps[3].get();
-
- {
- zauto_lock l(_mp._lock);
- _mp._backup_history.clear();
- _mp.reset_records();
-
- _mp.prepare_current_backup_on_new_unlocked();
- dsn::task_ptr tsk = tasking::create_task(TASK_CODE_EXEC_INLINED, nullptr, []() {});
- _mp.sync_backup_to_remote_storage_unlocked(_mp._cur_backup, tsk, true);
- tsk->wait();
- _mp.set_maxcall_issue_new_backup_unlocked(1);
-
- ASSERT_EQ(_mp._progress.unfinished_apps, _policy.app_ids.size());
- app->status = dsn::app_status::AS_DROPPED;
-
- _mp.continue_current_backup_unlocked();
- }
-
- // new backup will be issued 5s later.
- ASSERT_TRUE(_mp.notifier_issue_new_backup_unlocked().wait_for(20000));
-
- {
- zauto_lock l(_mp._lock);
- ASSERT_EQ(0, _mp._cur_backup.end_time_ms);
- ASSERT_EQ(0, _mp._progress.unfinished_apps);
- ASSERT_EQ(app->partition_count, _mp._progress.partition_progress.size());
-
- const backup_info &history = _mp._backup_history.begin()->second;
- ASSERT_NE(0, history.start_time_ms);
- ASSERT_GE(history.end_time_ms, history.start_time_ms);
-
- for (const auto &kv : _mp._progress.partition_progress) {
- ASSERT_EQ(kv.first.get_app_id(), app->app_id);
- ASSERT_EQ(kv.second, 1000);
- }
- for (const auto &kv : _mp._progress.unfinished_partitions_per_app) {
- ASSERT_EQ(0, kv.second);
- }
- }
- }
-
- {
- // test_case: a full backup procedure
- // Prepare: issue a new backup
- // Result: a new backup will be issued, and we have a entry on remote storage
- std::cout << "a successful entire backup" << std::endl;
- int64_t cur_start_time_ms = static_cast<int64_t>(dsn_now_ms());
- {
- zauto_lock l(_mp._lock);
- std::vector<std::pair<dsn::host_port, dsn::rpc_address>> node_list;
- generate_node_list(node_list, 3, 3);
-
- app_state *app = state->_all_apps[3].get();
- app->status = dsn::app_status::AS_AVAILABLE;
- for (partition_configuration &pc : app->partitions) {
- pc.primary = node_list[0].second;
- pc.secondaries = {node_list[1].second, node_list[2].second};
- pc.__set_hp_primary(node_list[0].first);
- pc.__set_hp_secondaries({node_list[1].first, node_list[2].first});
- }
-
- _mp._backup_history.clear();
- _mp.reset_records();
-
- // issue_in_test -> issued by finish all apps -> a delay for backup interval
- _mp.set_maxcall_issue_new_backup_unlocked(2);
- _mp.issue_new_backup_unlocked();
- }
-
- ASSERT_TRUE(_mp.notifier_issue_new_backup_unlocked().wait_for(10000));
-
- {
- zauto_lock l(_mp._lock);
- // as new backup is captured and abandoned, so we can check the current backup
- ASSERT_EQ(1, _mp._backup_history.size());
- // the first backup's id is 1
- ASSERT_LE(cur_start_time_ms, _mp._backup_history.begin()->first);
- const backup_info &history = _mp._backup_history.begin()->second;
- ASSERT_NE(0, history.start_time_ms);
- ASSERT_GE(history.end_time_ms, history.start_time_ms);
-
- // check the progress
- for (const auto &kv : _mp._progress.partition_progress) {
- ASSERT_EQ(kv.second, 1000);
- }
- ASSERT_EQ(0, _mp._progress.unfinished_apps);
- }
- }
-
- {
- // test case: add backup_history
- std::cout << "test add backup history" << std::endl;
-
- _mp._backup_history.clear();
- _mp._cur_backup.backup_id = 0;
- _mp._cur_backup.end_time_ms = 0;
-
- backup_info bi;
- bi.start_time_ms = 100;
- bi.end_time_ms = 110;
- bi.app_ids = {1, 2, 3};
- bi.backup_id = bi.start_time_ms;
- _mp.add_backup_history(bi);
-
- bi.start_time_ms += 1000;
- bi.end_time_ms += 1000;
- bi.app_ids = {1, 2, 5};
- bi.backup_id = bi.start_time_ms;
- _mp.add_backup_history(bi);
-
- bi.start_time_ms += 1000;
- bi.end_time_ms = 0;
- bi.app_ids = {1, 2, 7};
- bi.backup_id = bi.start_time_ms;
-
- _mp.add_backup_history(bi);
-
- ASSERT_EQ(bi.backup_id, _mp._cur_backup.backup_id);
- ASSERT_EQ(bi.app_ids, _mp._cur_backup.app_ids);
- ASSERT_EQ(0, _mp._cur_backup.end_time_ms);
-
- ASSERT_EQ(bi.app_ids.size(), _mp._progress.unfinished_apps);
- ASSERT_EQ(2, _mp._backup_history.size());
-
- std::string cur_backup_sig =
- test_policy_name + std::string("@") + std::to_string(bi.backup_id);
- ASSERT_EQ(cur_backup_sig, _mp._backup_sig);
- }
-}
-
-TEST_F(policy_context_test, test_disable_backup_policy)
-{
- _policy = _mp.get_policy();
- _policy.is_disable = true;
- _mp.set_policy(_policy);
-
- _mp._backup_history.clear();
- _mp._cur_backup.backup_id = 0;
- _mp._cur_backup.end_time_ms = 0;
-
- backup_info bi;
- bi.start_time_ms = dsn_now_ms();
- bi.end_time_ms = 0;
- bi.app_ids = {1};
- bi.backup_id = bi.start_time_ms;
- _mp.add_backup_history(bi);
-
- // 'start_backup_app_meta_unlocked()' should not be called because policy is disabled
- _mp.continue_current_backup_unlocked();
- ASSERT_FALSE(_mp.notifier_start_backup_app_meta_unlocked().wait_for(5000));
-}
-
-TEST_F(policy_context_test, test_backup_failed)
-{
- fail::setup();
- fail::cfg("mock_local_service_write_failed", "100%1*return(ERR_FS_INTERNAL)");
-
- // app 1 is available.
- dsn::app_info info;
- info.is_stateful = true;
- info.app_id = 1;
- info.app_type = "simple_kv";
- info.max_replica_count = 3;
- info.partition_count = 4;
- info.status = dsn::app_status::AS_AVAILABLE;
- _service->get_server_state()->_all_apps.emplace(info.app_id, app_state::create(info));
-
- {
- zauto_lock l(_mp._lock);
- _mp._backup_history.clear();
- _mp.reset_records();
-
- // start backup in this policy
- _mp.issue_new_backup_unlocked();
- }
- sleep(1);
- {
- zauto_lock l(_mp._lock);
- ASSERT_TRUE(_mp._is_backup_failed);
- }
- ASSERT_FALSE(_mp.is_under_backuping());
-
- fail::teardown();
-}
-
-// test should_start_backup_unlock()
-TEST_F(policy_context_test, test_should_start_backup)
-{
- uint64_t now = dsn_now_ms();
- int32_t hour = 0, min = 0, sec = 0;
- ::dsn::utils::time_ms_to_date_time(now, hour, min, sec);
- while (min == 59) {
- std::this_thread::sleep_for(std::chrono::minutes(1));
- now = dsn_now_ms();
- ::dsn::utils::time_ms_to_date_time(now, hour, min, sec);
- }
-
- int64_t oneday_sec = 1 * 24 * 60 * 60;
- _mp._policy.start_time.hour = hour;
- _mp._policy.start_time.minute = 0;
- _mp._policy.backup_interval_seconds = oneday_sec; // oneday
- _mp._backup_history.clear();
-
- backup_info info;
-
- {
- std::cout << "first backup & no limit to start_time" << std::endl;
- _mp._policy.start_time.hour = 24;
- ASSERT_TRUE(_mp.should_start_backup_unlocked());
- }
-
- {
- std::cout << "first backup & cur_time.hour == start_time.hour" << std::endl;
- _mp._policy.start_time.hour = hour;
- ASSERT_TRUE(_mp.should_start_backup_unlocked());
- }
-
- {
- std::cout << "first backup & cur_time.hour != start_time.hour" << std::endl;
- _mp._policy.start_time.hour = hour + 100; // invalid time
- ASSERT_FALSE(_mp.should_start_backup_unlocked());
- _mp._policy.start_time.hour = (hour + 1) % 24; // valid, but not reach
- ASSERT_FALSE(_mp.should_start_backup_unlocked());
- _mp._policy.start_time.hour = hour - 1; // time passed(also, include -1)
- ASSERT_FALSE(_mp.should_start_backup_unlocked());
- }
-
- {
- std::cout << "not first backup & recent backup delay 20min to start" << std::endl;
- info.start_time_ms = now - (oneday_sec * 1000) + 20 * 60 * 1000;
- info.end_time_ms = info.start_time_ms + 10;
- _mp.add_backup_history(info);
- // if we set start_time to 24:00, then will not start backup
- _mp._policy.start_time.hour = 24;
- ASSERT_FALSE(_mp.should_start_backup_unlocked());
- // if we set start_time to hour:00, then will start backup, even if the interval <
- // policy.backup_interval
- _mp._policy.start_time.hour = hour;
- ASSERT_TRUE(_mp.should_start_backup_unlocked());
- }
-
- {
- std::cout << "not first backup & recent backup start time is equal with start_time"
- << std::endl;
- _mp._policy.start_time.hour = hour;
- _mp._backup_history.clear();
- info.start_time_ms = now - (oneday_sec * 1000) - (min * 60 * 1000);
- info.start_time_ms = (info.start_time_ms / 1000) * 1000;
- info.end_time_ms = info.start_time_ms + 10;
- _mp.add_backup_history(info);
- ASSERT_TRUE(_mp.should_start_backup_unlocked());
- }
-
- {
- // delay the start_time
- std::cout << "not first backup & delay the start time of policy" << std::endl;
- _mp._policy.start_time.hour = hour + 1;
- _mp._backup_history.clear();
- // make sure the start time of recent backup is litte than policy's start_time, so we
- // minus more 3min
- info.start_time_ms = now - (oneday_sec * 1000) - 3 * 60 * 1000;
- info.end_time_ms = info.start_time_ms + 10;
- _mp.add_backup_history(info);
- if (_mp._policy.start_time.hour == 24) {
- // if hour = 23, then policy.start_time.hour = 24, we should start next backup,
- // because now - info.start_time_ms > policy.backup_interval
- ASSERT_TRUE(_mp.should_start_backup_unlocked());
- } else {
- // should not start, even if now - info.start_time_ms > policy.backup_interval, but
- // not reach the time-point that policy.start_time limit
- ASSERT_FALSE(_mp.should_start_backup_unlocked());
- }
- }
-
- {
- std::cout << "not first backup & no limit to start time & should start backup" << std::endl;
- _mp._policy.start_time.hour = 24;
- _mp._backup_history.clear();
- info.start_time_ms = now - (oneday_sec * 1000) - 3 * 60 * 60;
- info.end_time_ms = info.start_time_ms + 10;
- _mp.add_backup_history(info);
- ASSERT_TRUE(_mp.should_start_backup_unlocked());
- }
-
- {
- std::cout << "not first backup & no limit to start time & should not start backup"
- << std::endl;
- _mp._backup_history.clear();
- info.start_time_ms = now - (oneday_sec * 1000) + 3 * 60 * 60;
- info.end_time_ms = info.start_time_ms + 10;
- _mp.add_backup_history(info);
- ASSERT_FALSE(_mp.should_start_backup_unlocked());
- }
-}
-
-class meta_backup_service_test : public meta_test_base
-{
-protected:
- meta_backup_service_test() : _meta_svc(new fake_receiver_meta_service()), _backup_svc(nullptr)
- {
- }
-
- void SetUp() override
- {
- meta_test_base::SetUp();
-
- meta_options &opt = _meta_svc->_meta_opts;
- FLAGS_cluster_root = "/meta_test";
- FLAGS_meta_state_service_type = "meta_state_service_simple";
- _meta_svc->remote_storage_initialize();
- std::string backup_root = "/backup_test";
- std::string policy_meta_root = std::string(FLAGS_cluster_root) + "/backup_policies";
- _meta_svc->_backup_handler = std::make_shared<backup_service>(
- _meta_svc.get(), policy_meta_root, backup_root, [](backup_service *bs) {
- return std::make_shared<mock_policy>(bs);
- });
- _backup_svc = _meta_svc->_backup_handler.get();
- }
-
- std::shared_ptr<meta_service> _meta_svc;
- backup_service *_backup_svc;
-};
-
-TEST_F(meta_backup_service_test, test_add_backup_policy)
-{
- // create policy meta root.
- bool flag = false;
- dsn::task_ptr task_test =
- tasking::create_task(LPC_DEFAULT_CALLBACK, nullptr, [&flag]() { flag = true; });
- _backup_svc->start_create_policy_meta_root(task_test);
- while (!flag) {
- std::cout << "wait create policy_meta_root succeed" << std::endl;
- sleep(1);
- }
- ASSERT_TRUE(flag);
-
- configuration_add_backup_policy_request req;
- req.backup_provider_type = std::string("local_service");
- req.policy_name = test_policy_name;
- req.app_ids = {1, 2, 3};
- req.backup_interval_seconds = 24 * 60 * 60;
-
- // case1: backup policy doesn't contain any valid app_id
- // result: backup policy will not be added, and return ERR_INVALID_PARAMETERS
- {
- configuration_add_backup_policy_response resp;
- auto r = fake_rpc_call(RPC_CM_ADD_BACKUP_POLICY,
- LPC_DEFAULT_CALLBACK,
- _backup_svc,
- &backup_service::add_backup_policy,
- req);
- fake_wait_rpc(r, resp);
- ASSERT_EQ(ERR_INVALID_PARAMETERS, resp.err);
- // hint message contains the first invalid app id
- std::string hint_message = "invalid app 1";
- ASSERT_EQ(hint_message, resp.hint_message);
- }
-
- // case2: backup policy interval time < checkpoint reserve time
- // result: backup policy will not be added, and return ERR_INVALID_PARAMETERS
- {
- int64_t old_backup_interval_seconds = req.backup_interval_seconds;
- req.backup_interval_seconds = 10;
- configuration_add_backup_policy_response resp;
- server_state *state = _meta_svc->get_server_state();
- state->_all_apps.insert(std::make_pair(1, std::make_shared<app_state>(app_info())));
- auto r = fake_rpc_call(RPC_CM_ADD_BACKUP_POLICY,
- LPC_DEFAULT_CALLBACK,
- _backup_svc,
- &backup_service::add_backup_policy,
- req);
- fake_wait_rpc(r, resp);
-
- std::string hint_message = fmt::format(
- "backup interval must be larger than cold_backup_checkpoint_reserve_minutes={}",
- FLAGS_cold_backup_checkpoint_reserve_minutes);
- ASSERT_EQ(ERR_INVALID_PARAMETERS, resp.err);
- ASSERT_EQ(hint_message, resp.hint_message);
- req.backup_interval_seconds = old_backup_interval_seconds;
- }
-
- // case3: backup policy contains valid and invalid app_id
- // result: backup policy will not be added, and return ERR_INVALID_PARAMETERS
- {
- configuration_add_backup_policy_response resp;
- server_state *state = _meta_svc->get_server_state();
- state->_all_apps.insert(std::make_pair(1, std::make_shared<app_state>(app_info())));
- auto r = fake_rpc_call(RPC_CM_ADD_BACKUP_POLICY,
- LPC_DEFAULT_CALLBACK,
- _backup_svc,
- &backup_service::add_backup_policy,
- req);
- fake_wait_rpc(r, resp);
- ASSERT_EQ(ERR_INVALID_PARAMETERS, resp.err);
- // hint message contains the first invalid app id
- std::string hint_message = "invalid app 2";
- ASSERT_EQ(hint_message, resp.hint_message);
- }
-
- // case4: backup policy only contains valid app_id
- // result: add_backup_policy succeed
- {
- configuration_add_backup_policy_response resp;
- server_state *state = _meta_svc->get_server_state();
- state->_all_apps.insert(std::make_pair(2, std::make_shared<app_state>(app_info())));
- state->_all_apps.insert(std::make_pair(3, std::make_shared<app_state>(app_info())));
- auto r = fake_rpc_call(RPC_CM_ADD_BACKUP_POLICY,
- LPC_DEFAULT_CALLBACK,
- _backup_svc,
- &backup_service::add_backup_policy,
- req);
- fake_wait_rpc(r, resp);
- ASSERT_EQ(ERR_OK, resp.err);
- }
-
- // test sync_policies_from_remote_storage
- _backup_svc->_policy_states.clear();
- ASSERT_TRUE(_backup_svc->_policy_states.empty());
- error_code err = _backup_svc->sync_policies_from_remote_storage();
- ASSERT_EQ(ERR_OK, err);
- ASSERT_EQ(1, _backup_svc->_policy_states.size());
- ASSERT_TRUE(_backup_svc->_policy_states.find(test_policy_name) !=
- _backup_svc->_policy_states.end());
- const policy &p = _backup_svc->_policy_states.at(test_policy_name)->get_policy();
- ASSERT_EQ(3, p.app_ids.size());
- ASSERT_EQ("local_service", p.backup_provider_type);
- ASSERT_EQ(24 * 60 * 60, p.backup_interval_seconds);
- ASSERT_EQ(test_policy_name, p.policy_name);
-}
-
-} // namespace replication
-} // namespace dsn
diff --git a/src/meta/test/json_compacity.cpp b/src/meta/test/json_compacity.cpp
index c1ca9a7..01faa8e 100644
--- a/src/meta/test/json_compacity.cpp
+++ b/src/meta/test/json_compacity.cpp
@@ -25,18 +25,14 @@
*/
#include <string.h>
-#include <cstdint>
#include <iostream>
-#include <map>
#include <memory>
-#include <set>
#include <string>
#include <vector>
#include "common/json_helper.h"
#include "dsn.layer2_types.h"
#include "gtest/gtest.h"
-#include "meta/meta_backup_service.h"
#include "meta_service_test_app.h"
#include "runtime/rpc/rpc_address.h"
#include "runtime/rpc/rpc_host_port.h"
@@ -120,57 +116,6 @@
ASSERT_TRUE(result);
ASSERT_EQ(info2.app_name, "CL769:test");
ASSERT_EQ(info2.max_replica_count, 3);
-
- // 7. policy can be decoded correctly
- const char *json7 = "{\"policy_name\":\"every_day\",\"backup_provider_type\":\"simple\",\"app_"
- "ids\":[4,5,6,7,8,9,10,11,14,15,16,17,18,19,21,22,23,24],\"app_names\":{"
- "\"4\":\"aaaa\",\"5\":\"aaaa\",\"6\":\"aaaa\",\"7\":\"aaaa\",\"8\":"
- "\"aaaa\",\"9\":\"aaaa\",\"10\":\"aaaa\",\"11\":\"aaaa\",\"14\":\"aaaa\","
- "\"15\":\"aaaa\",\"16\":\"aaaa\",\"17\":\"aaaa\",\"18\":\"aaaa\",\"19\":"
- "\"aaaa\",\"21\":\"aaaa\",\"22\":\"aaaa\",\"23\":\"aaaa\",\"24\":\"aaaa\"},"
- "\"backup_interval_seconds\":86400,\"backup_history_count_to_keep\":3,\"is_"
- "disable\":0,\"start_time\":{\"hour\":0,\"minute\":30}}";
- dsn::replication::policy p;
- result = dsn::json::json_forwarder<dsn::replication::policy>::decode(
- dsn::blob(json7, 0, strlen(json7)), p);
- ASSERT_TRUE(result);
- ASSERT_EQ("every_day", p.policy_name);
- ASSERT_EQ("simple", p.backup_provider_type);
-
- std::set<int32_t> app_ids = {4, 5, 6, 7, 8, 9, 10, 11, 14, 15, 16, 17, 18, 19, 21, 22, 23, 24};
- ASSERT_EQ(app_ids, p.app_ids);
-
- std::map<int32_t, std::string> app_names;
- for (int32_t i : app_ids) {
- app_names.emplace(i, "aaaa");
- }
- ASSERT_EQ(app_names, p.app_names);
- ASSERT_EQ(86400, p.backup_interval_seconds);
- ASSERT_EQ(3, p.backup_history_count_to_keep);
- ASSERT_EQ(0, p.is_disable);
- ASSERT_EQ(0, p.start_time.hour);
- ASSERT_EQ(30, p.start_time.minute);
-
- // 8. backup info can be decoded correctly
- const char *json8 =
- "{\"backup_id\":1528216470578,\"start_time_ms\":1528216470578,\"end_time_"
- "ms\":1528217091629,\"app_ids\":[4,5,6,7,8,9,10,11,14,15,16,17,18,19,21,22,"
- "23,24],\"app_names\":{\"4\":\"aaaa\",\"5\":\"aaaa\",\"6\":\"aaaa\",\"7\":\"aaaa\",\"8\":"
- "\"aaaa\",\"9\":\"aaaa\",\"10\":\"aaaa\",\"11\":\"aaaa\",\"14\":\"aaaa\",\"15\":\"aaaa\","
- "\"16\":"
- "\"aaaa\",\"17\":\"aaaa\",\"18\":\"aaaa\",\"19\":\"aaaa\",\"21\":\"aaaa\",\"22\":\"aaaa\","
- "\"23\":"
- "\"aaaa\",\"24\":\"aaaa\"},\"info_status\":1}";
- dsn::replication::backup_info binfo;
- result = dsn::json::json_forwarder<dsn::replication::backup_info>::decode(
- dsn::blob(json8, 0, strlen(json8)), binfo);
- ASSERT_TRUE(result);
- ASSERT_EQ(1528216470578, binfo.backup_id);
- ASSERT_EQ(1528216470578, binfo.start_time_ms);
- ASSERT_EQ(1528217091629, binfo.end_time_ms);
- ASSERT_EQ(app_ids, binfo.app_ids);
- ASSERT_EQ(app_names, binfo.app_names);
- ASSERT_EQ(1, binfo.info_status);
}
} // namespace replication
diff --git a/src/meta/test/meta_backup_test.cpp b/src/meta/test/meta_backup_test.cpp
index 5a77b13..b248758 100644
--- a/src/meta/test/meta_backup_test.cpp
+++ b/src/meta/test/meta_backup_test.cpp
@@ -14,359 +14,3 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-
-#include <cstdint>
-#include <map>
-#include <memory>
-#include <string>
-#include <utility>
-#include <vector>
-
-#include "backup_types.h"
-#include "common/backup_common.h"
-#include "common/gpid.h"
-#include "common/replication.codes.h"
-#include "gtest/gtest.h"
-#include "meta/backup_engine.h"
-#include "meta/meta_backup_service.h"
-#include "meta/meta_data.h"
-#include "meta/meta_rpc_types.h"
-#include "meta/meta_service.h"
-#include "meta/server_state.h"
-#include "meta_test_base.h"
-#include "runtime/api_layer1.h"
-#include "runtime/rpc/rpc_host_port.h"
-#include "utils/env.h"
-#include "utils/error_code.h"
-#include "utils/fail_point.h"
-#include "utils/filesystem.h"
-#include "utils/zlocks.h"
-
-namespace dsn {
-namespace replication {
-
-class backup_service_test : public meta_test_base
-{
-public:
- backup_service_test()
- : _policy_root("test_policy_root"),
- _backup_root("test_backup_root"),
- _app_name("test_app"),
- _backup_service(nullptr)
- {
- }
-
- void SetUp() override
- {
- meta_test_base::SetUp();
- _ms->_backup_handler =
- std::make_shared<backup_service>(_ms.get(), _policy_root, _backup_root, nullptr);
- _backup_service = _ms->_backup_handler;
-
- // create an app with 8 partitions.
- create_app(_app_name);
- }
-
- start_backup_app_response
- start_backup(int32_t app_id, const std::string &provider, const std::string &backup_path = "")
- {
- auto request = std::make_unique<start_backup_app_request>();
- request->app_id = app_id;
- request->backup_provider_type = provider;
- if (!backup_path.empty()) {
- request->__set_backup_path(backup_path);
- }
-
- start_backup_app_rpc rpc(std::move(request), RPC_CM_START_BACKUP_APP);
- _backup_service->start_backup_app(rpc);
- wait_all();
- return rpc.response();
- }
-
- query_backup_status_response query_backup(int32_t app_id, int64_t backup_id)
- {
- auto request = std::make_unique<query_backup_status_request>();
- request->app_id = app_id;
- request->__isset.backup_id = true;
- request->backup_id = backup_id;
-
- query_backup_status_rpc rpc(std::move(request), RPC_CM_QUERY_BACKUP_STATUS);
- _backup_service->query_backup_status(rpc);
- wait_all();
- return rpc.response();
- }
-
- bool write_metadata_succeed(int32_t app_id,
- int64_t backup_id,
- const std::string &user_specified_path)
- {
- std::string backup_root = dsn::utils::filesystem::path_combine(
- user_specified_path, _backup_service->backup_root());
- auto app = _ms->_state->get_app(app_id);
- std::string metadata_file =
- cold_backup::get_app_metadata_file(backup_root, app->app_name, app_id, backup_id);
-
- int64_t metadata_file_size = 0;
- if (!dsn::utils::filesystem::file_size(
- metadata_file, dsn::utils::FileDataType::kSensitive, metadata_file_size)) {
- return false;
- }
- return metadata_file_size > 0;
- }
-
- void test_specific_backup_path(int32_t test_app_id, const std::string &user_specified_path = "")
- {
- auto resp = start_backup(test_app_id, "local_service_empty_root", user_specified_path);
- ASSERT_EQ(ERR_OK, resp.err);
- ASSERT_TRUE(resp.__isset.backup_id);
- ASSERT_EQ(1, _backup_service->_backup_states.size());
-
- auto backup_engine = _backup_service->_backup_states[0];
- if (user_specified_path.empty()) {
- ASSERT_TRUE(backup_engine->_backup_path.empty());
- } else {
- ASSERT_EQ(user_specified_path, backup_engine->_backup_path);
- }
-
- int64_t backup_id = resp.backup_id;
- ASSERT_TRUE(write_metadata_succeed(test_app_id, backup_id, user_specified_path));
- }
-
-protected:
- const std::string _policy_root;
- const std::string _backup_root;
- const std::string _app_name;
- std::shared_ptr<backup_service> _backup_service;
-};
-
-TEST_F(backup_service_test, test_invalid_backup_request)
-{
- // invalid app id.
- int32_t test_app_id = _ss->next_app_id();
- auto resp = start_backup(test_app_id, "local_service");
- ASSERT_EQ(ERR_INVALID_STATE, resp.err);
-
- // invalid provider.
- resp = start_backup(1, "invalid_provider");
- ASSERT_EQ(ERR_INVALID_PARAMETERS, resp.err);
-}
-
-TEST_F(backup_service_test, test_init_backup)
-{
- int64_t now = dsn_now_ms();
- auto resp = start_backup(1, "local_service");
- ASSERT_EQ(ERR_OK, resp.err);
- ASSERT_LE(now, resp.backup_id);
- ASSERT_EQ(1, _backup_service->_backup_states.size());
-
- // backup for app 1 is running, couldn't backup it again.
- resp = start_backup(1, "local_service");
- ASSERT_EQ(ERR_INVALID_STATE, resp.err);
-
- resp = start_backup(2, "local_service");
- ASSERT_EQ(ERR_OK, resp.err);
-}
-
-TEST_F(backup_service_test, test_write_backup_metadata_failed)
-{
- fail::setup();
- fail::cfg("mock_local_service_write_failed", "100%1*return(ERR_FS_INTERNAL)");
-
- // we couldn't start backup an app if write backup metadata failed.
- auto resp = start_backup(1, "local_service");
- ASSERT_EQ(ERR_FS_INTERNAL, resp.err);
-
- fail::teardown();
-}
-
-TEST_F(backup_service_test, test_backup_app_with_no_specific_path) { test_specific_backup_path(1); }
-
-TEST_F(backup_service_test, test_backup_app_with_user_specified_path)
-{
- test_specific_backup_path(1, "test/backup");
-}
-
-TEST_F(backup_service_test, test_query_backup_status)
-{
- // query a backup that does not exist
- auto resp = query_backup(1, 1);
- ASSERT_EQ(ERR_INVALID_PARAMETERS, resp.err);
-
- auto start_backup_resp = start_backup(1, "local_service");
- ASSERT_EQ(ERR_OK, start_backup_resp.err);
- ASSERT_EQ(1, _backup_service->_backup_states.size());
-
- // query backup succeed
- int64_t backup_id = start_backup_resp.backup_id;
- resp = query_backup(1, backup_id);
- ASSERT_EQ(ERR_OK, resp.err);
- ASSERT_TRUE(resp.__isset.backup_items);
- ASSERT_EQ(1, resp.backup_items.size());
-}
-
-class backup_engine_test : public meta_test_base
-{
-public:
- backup_engine_test()
- : _policy_root("test_policy_root"),
- _backup_root("test_backup_root"),
- _app_name("test_app"),
- _app_id(1),
- _partition_count(8),
- _backup_engine(nullptr)
- {
- }
-
- void SetUp() override
- {
- meta_test_base::SetUp();
- _ms->_backup_handler =
- std::make_shared<backup_service>(_ms.get(), _policy_root, _backup_root, nullptr);
- _backup_engine = std::make_shared<backup_engine>(_ms->_backup_handler.get());
- _backup_engine->set_block_service("local_service");
-
- zauto_lock lock(_backup_engine->_lock);
- _backup_engine->_backup_status.clear();
- for (int i = 0; i < _partition_count; ++i) {
- _backup_engine->_backup_status.emplace(i, backup_status::UNALIVE);
- }
- _backup_engine->_cur_backup.app_id = _app_id;
- _backup_engine->_cur_backup.app_name = _app_name;
- _backup_engine->_cur_backup.backup_id = static_cast<int64_t>(dsn_now_ms());
- _backup_engine->_cur_backup.start_time_ms = _backup_engine->_cur_backup.backup_id;
- }
-
- void mock_backup_app_partitions()
- {
- zauto_lock l(_backup_engine->_lock);
- for (int i = 0; i < _partition_count; ++i) {
- _backup_engine->_backup_status[i] = backup_status::ALIVE;
- }
- }
-
- void mock_on_backup_reply(int32_t partition_index,
- error_code rpc_err,
- error_code resp_err,
- int32_t progress)
- {
- gpid pid = gpid(_app_id, partition_index);
- const auto hp_mock_primary = host_port("localhost", 10000 + partition_index);
-
- backup_response resp;
- resp.backup_id = _backup_engine->_cur_backup.backup_id;
- resp.pid = pid;
- resp.err = resp_err;
- resp.progress = progress;
-
- _backup_engine->on_backup_reply(rpc_err, resp, pid, hp_mock_primary);
- }
-
- void mock_on_backup_reply_when_timeout(int32_t partition_index, error_code rpc_err)
- {
- gpid pid = gpid(_app_id, partition_index);
- const auto hp_mock_primary = host_port("localhost", 10000 + partition_index);
- backup_response resp;
- _backup_engine->on_backup_reply(rpc_err, resp, pid, hp_mock_primary);
- }
-
- bool is_backup_failed() const
- {
- zauto_lock l(_backup_engine->_lock);
- return _backup_engine->_is_backup_failed;
- }
-
- void reset_backup_engine()
- {
- zauto_lock l(_backup_engine->_lock);
- _backup_engine->_is_backup_failed = false;
- }
-
-protected:
- const std::string _policy_root;
- const std::string _backup_root;
- const std::string _app_name;
- const int32_t _app_id;
- const int32_t _partition_count;
- std::shared_ptr<backup_engine> _backup_engine;
-};
-
-TEST_F(backup_engine_test, test_on_backup_reply)
-{
- mock_backup_app_partitions();
-
- // recieve a rpc error
- mock_on_backup_reply(/*partition_index=*/0, ERR_NETWORK_FAILURE, ERR_BUSY, /*progress=*/0);
- ASSERT_TRUE(_backup_engine->is_in_progress());
-
- // recieve a backup finished response
- reset_backup_engine();
- mock_on_backup_reply(/*partition_index=*/1,
- ERR_OK,
- ERR_OK,
- /*progress=*/cold_backup_constant::PROGRESS_FINISHED);
- ASSERT_TRUE(_backup_engine->is_in_progress());
-
- // receive a backup in-progress response
- reset_backup_engine();
- mock_on_backup_reply(/*partition_index=*/2, ERR_OK, ERR_BUSY, /*progress=*/0);
- ASSERT_TRUE(_backup_engine->is_in_progress());
- ASSERT_EQ(_backup_engine->_backup_status[2], backup_status::ALIVE);
-
- // if one partition fail, all backup plan will fail
- {
- // receive a backup failed response
- reset_backup_engine();
- mock_on_backup_reply(/*partition_index=*/3, ERR_OK, ERR_LOCAL_APP_FAILURE, /*progress=*/0);
- ASSERT_TRUE(is_backup_failed());
-
- // this backup is still a failure even received non-failure response
- mock_on_backup_reply(/*partition_index=*/4, ERR_OK, ERR_BUSY, /*progress=*/0);
- ASSERT_TRUE(is_backup_failed());
-
- mock_on_backup_reply(/*partition_index=*/5,
- ERR_OK,
- ERR_OK,
- /*progress=*/cold_backup_constant::PROGRESS_FINISHED);
- ASSERT_TRUE(is_backup_failed());
- }
-
- // meta request is timeout
- reset_backup_engine();
- mock_on_backup_reply_when_timeout(/*partition_index=*/5, ERR_TIMEOUT);
- ASSERT_FALSE(is_backup_failed());
-}
-
-TEST_F(backup_engine_test, test_backup_completed)
-{
- mock_backup_app_partitions();
- for (int i = 0; i < _partition_count; ++i) {
- mock_on_backup_reply(/*partition_index=*/i,
- ERR_OK,
- ERR_OK,
- /*progress=*/cold_backup_constant::PROGRESS_FINISHED);
- }
- ASSERT_FALSE(is_backup_failed());
- ASSERT_LE(_backup_engine->_cur_backup.start_time_ms, _backup_engine->_cur_backup.end_time_ms);
-}
-
-TEST_F(backup_engine_test, test_write_backup_info_failed)
-{
- fail::setup();
- fail::cfg("mock_local_service_write_failed", "100%1*return(ERR_FS_INTERNAL)");
-
- // finish all partitions backup but write backup info failed.
- mock_backup_app_partitions();
- for (int i = 0; i < _partition_count; ++i) {
- mock_on_backup_reply(/*partition_index=*/i,
- ERR_OK,
- ERR_OK,
- /*progress=*/cold_backup_constant::PROGRESS_FINISHED);
- }
- ASSERT_TRUE(is_backup_failed());
- ASSERT_EQ(0, _backup_engine->_cur_backup.end_time_ms);
-
- fail::teardown();
-}
-
-} // namespace replication
-} // namespace dsn
diff --git a/src/meta/test/meta_http_service_test.cpp b/src/meta/test/meta_http_service_test.cpp
index ab0f3cf..631a20c 100644
--- a/src/meta/test/meta_http_service_test.cpp
+++ b/src/meta/test/meta_http_service_test.cpp
@@ -15,38 +15,24 @@
// specific language governing permissions and limitations
// under the License.
-#include <chrono>
#include <cstdint>
#include <memory>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
-#include <vector>
-#include "backup_types.h"
#include "bulk_load_types.h"
#include "common/gpid.h"
#include "common/replication_other_types.h"
#include "gtest/gtest.h"
#include "http/http_server.h"
#include "http/http_status_code.h"
-#include "meta/meta_backup_service.h"
#include "meta/meta_bulk_load_service.h"
#include "meta/meta_data.h"
#include "meta/meta_http_service.h"
-#include "meta/meta_service.h"
-#include "meta/meta_state_service.h"
-#include "meta_service_test_app.h"
#include "meta_test_base.h"
-#include "runtime/rpc/rpc_holder.h"
-#include "runtime/rpc/rpc_message.h"
-#include "runtime/task/task.h"
-#include "runtime/task/task_code.h"
-#include "utils/autoref_ptr.h"
#include "utils/blob.h"
-#include "utils/chrono_literals.h"
-#include "utils/error_code.h"
#include "utils/fail_point.h"
namespace dsn {
@@ -104,68 +90,10 @@
std::string test_app = "test_meta_http";
};
+// TODO(heyuchen): implement it
class meta_backup_test_base : public meta_test_base
{
public:
- void SetUp() override
- {
- meta_test_base::SetUp();
-
- _ms->_backup_handler = std::make_shared<backup_service>(
- _ms.get(),
- _ms->_cluster_root + "/backup_meta",
- _ms->_cluster_root + "/backup",
- [](backup_service *bs) { return std::make_shared<policy_context>(bs); });
- _ms->_backup_handler->start();
- _ms->_backup_handler->backup_option().app_dropped_retry_delay_ms = 500_ms;
- _ms->_backup_handler->backup_option().request_backup_period_ms = 20_ms;
- _ms->_backup_handler->backup_option().issue_backup_interval_ms = 1000_ms;
- const std::string policy_root = "/test";
- dsn::error_code ec;
- _ms->_storage
- ->create_node(
- _policy_root, dsn::TASK_CODE_EXEC_INLINED, [&ec](dsn::error_code err) { ec = err; })
- ->wait();
- _mhs = std::make_unique<meta_http_service>(_ms.get());
- create_app(test_app);
- }
-
- void add_backup_policy(const std::string &policy_name)
- {
- static const std::string test_policy_name = policy_name;
- const std::string policy_root = "/test";
-
- configuration_add_backup_policy_request request;
- configuration_add_backup_policy_response response;
-
- request.policy_name = policy_name;
- request.backup_provider_type = "local_service";
- request.backup_interval_seconds = 24 * 60 * 60;
- request.backup_history_count_to_keep = 1;
- request.start_time = "12:00";
- request.app_ids.clear();
- request.app_ids.push_back(2);
-
- auto result = fake_create_policy(_ms->_backup_handler.get(), request);
-
- fake_wait_rpc(result, response);
- // need to fix
- ASSERT_EQ(response.err, ERR_OK);
- }
-
- void test_get_backup_policy(const std::string &name,
- const std::string &expected_json,
- const http_status_code &http_status)
- {
- http_request req;
- http_response resp;
- if (!name.empty())
- req.query_args.emplace("name", name);
- _mhs->query_backup_policy_handler(req, resp);
- ASSERT_EQ(resp.status_code, http_status) << get_http_status_message(resp.status_code);
- ASSERT_EQ(resp.body, expected_json);
- }
-
protected:
const std::string _policy_root = "/test";
@@ -177,42 +105,8 @@
TEST_F(meta_http_service_test, get_app_envs) { test_get_app_envs(); }
-TEST_F(meta_backup_test_base, get_backup_policy)
-{
- struct http_backup_policy_test
- {
- std::string name;
- std::string expected_json;
- http_status_code http_status;
- } tests[5] = {
- {"", "{}\n", http_status_code::kOk},
- {"TEST1",
- "{\"TEST1\":{\"name\":\"TEST1\",\"backup_provider_type\":\"local_service\","
- "\"backup_interval\":\"86400\",\"app_ids\":\"[2]\",\"start_time\":\"12:00\","
- "\"status\":\"enabled\",\"backup_history_count\":\"1\"}}\n",
- http_status_code::kOk},
- {"TEST2",
- "{\"TEST2\":{\"name\":\"TEST2\",\"backup_provider_type\":\"local_service\","
- "\"backup_interval\":\"86400\",\"app_ids\":\"[2]\",\"start_time\":\"12:00\","
- "\"status\":\"enabled\",\"backup_history_count\":\"1\"}}\n",
- http_status_code::kOk},
- {"",
- "{\"TEST1\":{\"name\":\"TEST1\",\"backup_provider_type\":\"local_service\",\"backup_"
- "interval\":\"86400\",\"app_ids\":\"[2]\",\"start_time\":\"12:00\",\"status\":\"enabled\","
- "\"backup_history_count\":\"1\"},\"TEST2\":{\"name\":\"TEST2\",\"backup_provider_"
- "type\":\"local_service\",\"backup_interval\":\"86400\",\"app_ids\":\"[2]\",\"start_"
- "time\":\"12:00\",\"status\":\"enabled\",\"backup_history_count\":\"1\"}}\n",
- http_status_code::kOk},
- {"TEST3", "{}\n", http_status_code::kOk},
- };
- test_get_backup_policy(tests[0].name, tests[0].expected_json, tests[0].http_status);
- add_backup_policy("TEST1");
- test_get_backup_policy(tests[1].name, tests[1].expected_json, tests[1].http_status);
- add_backup_policy("TEST2");
- test_get_backup_policy(tests[2].name, tests[2].expected_json, tests[2].http_status);
- test_get_backup_policy(tests[3].name, tests[3].expected_json, tests[3].http_status);
- test_get_backup_policy(tests[4].name, tests[4].expected_json, tests[4].http_status);
-}
+// TODO(heyuchen): implement it
+// TEST_F(meta_backup_test_base, get_backup_policy) {}
class meta_bulk_load_http_test : public meta_test_base
{
diff --git a/src/meta/test/server_state_restore_test.cpp b/src/meta/test/server_state_restore_test.cpp
index 6c997b6..a1345c2 100644
--- a/src/meta/test/server_state_restore_test.cpp
+++ b/src/meta/test/server_state_restore_test.cpp
@@ -73,7 +73,7 @@
start_backup_app_rpc rpc(std::move(request), RPC_CM_START_BACKUP_APP);
_ms->_backup_handler =
- std::make_shared<backup_service>(_ms.get(), "mock_policy_root", _cluster_name, nullptr);
+ std::make_shared<backup_service>(_ms.get(), "mock_policy_root", _cluster_name);
_ms->_backup_handler->start_backup_app(rpc);
wait_all();
return rpc.response();
@@ -168,12 +168,14 @@
const std::string _provider;
};
-TEST_F(server_state_restore_test, test_restore_app) { test_restore_app(); }
+// TODO(heyuchen): update it
-TEST_F(server_state_restore_test, test_restore_app_with_specific_path)
-{
- test_restore_app("test_path");
-}
+// TEST_F(server_state_restore_test, test_restore_app) { test_restore_app(); }
+
+// TEST_F(server_state_restore_test, test_restore_app_with_specific_path)
+//{
+// test_restore_app("test_path");
+//}
} // namespace replication
} // namespace dsn
diff --git a/src/replica/CMakeLists.txt b/src/replica/CMakeLists.txt
index 3bb64db..3bf778b 100644
--- a/src/replica/CMakeLists.txt
+++ b/src/replica/CMakeLists.txt
@@ -35,8 +35,6 @@
)
set(BACKUP_SRC backup/replica_backup_manager.cpp
- backup/cold_backup_context.cpp
- backup/replica_backup_server.cpp
)
set(BULK_LOAD_SRC bulk_load/replica_bulk_loader.cpp)
diff --git a/src/replica/backup/cold_backup_context.cpp b/src/replica/backup/cold_backup_context.cpp
deleted file mode 100644
index a96dd94..0000000
--- a/src/replica/backup/cold_backup_context.cpp
+++ /dev/null
@@ -1,1083 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "cold_backup_context.h"
-
-#include <chrono>
-#include <cstdint>
-#include <memory>
-// IWYU pragma: no_include <type_traits>
-
-#include "common/backup_common.h"
-#include "common/replication.codes.h"
-#include "replica/replica.h"
-#include "runtime/api_layer1.h"
-#include "runtime/task/async_calls.h"
-#include "utils/blob.h"
-#include "utils/error_code.h"
-#include "utils/filesystem.h"
-#include "utils/metrics.h"
-#include "utils/utils.h"
-
-namespace dsn {
-namespace replication {
-
-const char *cold_backup_status_to_string(cold_backup_status status)
-{
- switch (status) {
- case ColdBackupInvalid:
- return "ColdBackupInvalid";
- case ColdBackupChecking:
- return "ColdBackupChecking";
- case ColdBackupChecked:
- return "ColdBackupChecked";
- case ColdBackupCheckpointing:
- return "ColdBackupCheckpointing";
- case ColdBackupCheckpointed:
- return "ColdBackupCheckpointed";
- case ColdBackupUploading:
- return "ColdBackupUploading";
- case ColdBackupPaused:
- return "ColdBackupPaused";
- case ColdBackupCanceled:
- return "ColdBackupCanceled";
- case ColdBackupCompleted:
- return "ColdBackupCompleted";
- case ColdBackupFailed:
- return "ColdBackupFailed";
- default:
- CHECK(false, "");
- }
- return "ColdBackupXXX";
-}
-
-void cold_backup_context::cancel()
-{
- _status.store(ColdBackupCanceled);
- if (_owner_replica != nullptr) {
- METRIC_INCREMENT(*_owner_replica, backup_cancelled_count);
- }
-}
-
-bool cold_backup_context::start_check()
-{
- int invalid = ColdBackupInvalid;
- if (_status.compare_exchange_strong(invalid, ColdBackupChecking)) {
- _start_time_ms = dsn_now_ms();
- return true;
- } else {
- return false;
- }
-}
-
-bool cold_backup_context::fail_check(const char *failure_reason)
-{
- int checking = ColdBackupChecking;
- if (_status.compare_exchange_strong(checking, ColdBackupFailed)) {
- strncpy(_reason, failure_reason, sizeof(_reason) - 1);
- _reason[sizeof(_reason) - 1] = '\0';
- if (_owner_replica != nullptr) {
- METRIC_INCREMENT(*_owner_replica, backup_failed_count);
- }
- return true;
- } else {
- return false;
- }
-}
-
-bool cold_backup_context::complete_check(bool uploaded)
-{
- int checking = ColdBackupChecking;
- if (uploaded) {
- _progress.store(cold_backup_constant::PROGRESS_FINISHED);
- if (_owner_replica != nullptr) {
- METRIC_INCREMENT(*_owner_replica, backup_successful_count);
- }
- return _status.compare_exchange_strong(checking, ColdBackupCompleted);
- } else {
- return _status.compare_exchange_strong(checking, ColdBackupChecked);
- }
-}
-
-bool cold_backup_context::start_checkpoint()
-{
- int checked = ColdBackupChecked;
- if (_status.compare_exchange_strong(checked, ColdBackupCheckpointing)) {
- return true;
- } else {
- return false;
- }
-}
-
-bool cold_backup_context::fail_checkpoint(const char *failure_reason)
-{
- int checkpointing = ColdBackupCheckpointing;
- if (_status.compare_exchange_strong(checkpointing, ColdBackupFailed)) {
- strncpy(_reason, failure_reason, sizeof(_reason) - 1);
- _reason[sizeof(_reason) - 1] = '\0';
- if (_owner_replica != nullptr) {
- METRIC_INCREMENT(*_owner_replica, backup_failed_count);
- }
- return true;
- } else {
- return false;
- }
-}
-
-bool cold_backup_context::complete_checkpoint()
-{
- int checkpointing = ColdBackupCheckpointing;
- if (_status.compare_exchange_strong(checkpointing, ColdBackupCheckpointed)) {
- return true;
- } else {
- return false;
- }
-}
-bool cold_backup_context::fail_upload(const char *failure_reason)
-{
- int uploading = ColdBackupUploading;
- int paused = ColdBackupPaused;
- if (_status.compare_exchange_strong(uploading, ColdBackupFailed) ||
- _status.compare_exchange_strong(paused, ColdBackupFailed)) {
- strncpy(_reason, failure_reason, sizeof(_reason) - 1);
- _reason[sizeof(_reason) - 1] = '\0';
- if (_owner_replica != nullptr) {
- METRIC_INCREMENT(*_owner_replica, backup_failed_count);
- }
- return true;
- } else {
- return false;
- }
-}
-
-bool cold_backup_context::complete_upload()
-{
- int uploading = ColdBackupUploading;
- int paused = ColdBackupPaused;
- if (_status.compare_exchange_strong(uploading, ColdBackupCompleted) ||
- _status.compare_exchange_strong(paused, ColdBackupCompleted)) {
- _progress.store(cold_backup_constant::PROGRESS_FINISHED);
- if (_owner_replica != nullptr) {
- METRIC_INCREMENT(*_owner_replica, backup_successful_count);
- }
- return true;
- } else {
- return false;
- }
-}
-
-// run in REPLICATION_LONG thread
-void cold_backup_context::check_backup_on_remote()
-{
- // check whether current checkpoint file is exist on remote, and verify whether the checkpoint
- // directory is exist
- std::string current_chkpt_file = cold_backup::get_current_chkpt_file(
- backup_root, request.app_name, request.pid, request.backup_id);
- dist::block_service::create_file_request req;
- req.file_name = current_chkpt_file;
- req.ignore_metadata = false;
-
- // incr the ref counter, and must release_ref() after callback is execute
- add_ref();
-
- block_service->create_file(
- std::move(req),
- LPC_BACKGROUND_COLD_BACKUP,
- [this, current_chkpt_file](const dist::block_service::create_file_response &resp) {
- if (!is_ready_for_check()) {
- LOG_INFO("{}: backup status has changed to {}, ignore checking backup on remote",
- name,
- cold_backup_status_to_string(status()));
- ignore_check();
- } else if (resp.err == ERR_OK) {
- const dist::block_service::block_file_ptr &file_handle = resp.file_handle;
- CHECK_NOTNULL(file_handle, "");
- if (file_handle->get_md5sum().empty() && file_handle->get_size() <= 0) {
- LOG_INFO("{}: check backup on remote, current_checkpoint file {} is not exist",
- name,
- current_chkpt_file);
- complete_check(false);
- } else {
- LOG_INFO("{}: check backup on remote, current_checkpoint file {} is exist",
- name,
- current_chkpt_file);
- read_current_chkpt_file(file_handle);
- }
- } else if (resp.err == ERR_TIMEOUT) {
- LOG_ERROR(
- "{}: block service create file timeout, retry after 10 seconds, file = {}",
- name,
- current_chkpt_file);
-
- // before retry, should add_ref(), and must release_ref() after retry
- add_ref();
-
- tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP,
- nullptr,
- [this]() {
- // before retry, should check whether the status is ready for
- // check
- if (!is_ready_for_check()) {
- LOG_INFO("{}: backup status has changed to {}, ignore "
- "checking backup on remote",
- name,
- cold_backup_status_to_string(status()));
- ignore_check();
- } else {
- check_backup_on_remote();
- }
- release_ref();
- },
- 0,
- std::chrono::seconds(10));
- } else {
- LOG_ERROR("{}: block service create file failed, file = {}, err = {}",
- name,
- current_chkpt_file,
- resp.err);
- fail_check("block service create file failed");
- }
- release_ref();
- });
-}
-
-void cold_backup_context::read_current_chkpt_file(
- const dist::block_service::block_file_ptr &file_handle)
-{
- dist::block_service::read_request req;
- req.remote_pos = 0;
- req.remote_length = -1;
-
- add_ref();
-
- file_handle->read(
- std::move(req),
- LPC_BACKGROUND_COLD_BACKUP,
- [this, file_handle](const dist::block_service::read_response &resp) {
- if (!is_ready_for_check()) {
- LOG_INFO("{}: backup status has changed to {}, ignore checking backup on remote",
- name,
- cold_backup_status_to_string(status()));
- ignore_check();
- } else if (resp.err == ERR_OK) {
- std::string chkpt_dirname(resp.buffer.data(), resp.buffer.length());
- if (chkpt_dirname.empty()) {
- complete_check(false);
- } else {
- LOG_INFO("{}: after read current_checkpoint_file, check whether remote "
- "checkpoint dir = {} is exist",
- name,
- chkpt_dirname);
- remote_chkpt_dir_exist(chkpt_dirname);
- }
- } else if (resp.err == ERR_TIMEOUT) {
- LOG_ERROR("{}: read remote file timeout, retry after 10s, file = {}",
- name,
- file_handle->file_name());
- add_ref();
-
- tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP,
- nullptr,
- [this, file_handle]() {
- if (!is_ready_for_check()) {
- LOG_INFO("{}: backup status has changed to {}, ignore "
- "checking backup on remote",
- name,
- cold_backup_status_to_string(status()));
- ignore_check();
- } else {
- read_current_chkpt_file(file_handle);
- }
- release_ref();
- },
- 0,
- std::chrono::seconds(10));
- } else {
- LOG_ERROR("{}: read remote file failed, file = {}, err = {}",
- name,
- file_handle->file_name(),
- resp.err);
- fail_check("read remote file failed");
- }
- release_ref();
- return;
- });
-}
-
-void cold_backup_context::remote_chkpt_dir_exist(const std::string &chkpt_dirname)
-{
- dist::block_service::ls_request req;
- req.dir_name = cold_backup::get_replica_backup_path(
- backup_root, request.app_name, request.pid, request.backup_id);
-
- add_ref();
-
- block_service->list_dir(
- std::move(req),
- LPC_BACKGROUND_COLD_BACKUP,
- [this, chkpt_dirname](const dist::block_service::ls_response &resp) {
- if (!is_ready_for_check()) {
- LOG_INFO("{}: backup status has changed to {}, ignore checking backup on remote",
- name,
- cold_backup_status_to_string(status()));
- ignore_check();
- } else if (resp.err == ERR_OK) {
- bool found_chkpt_dir = false;
- for (const auto &entry : (*resp.entries)) {
- if (entry.is_directory && entry.entry_name == chkpt_dirname) {
- found_chkpt_dir = true;
- break;
- }
- }
- if (found_chkpt_dir) {
- LOG_INFO("{}: remote checkpoint dir is already exist, so upload have already "
- "complete, remote_checkpoint_dirname = {}",
- name,
- chkpt_dirname);
- complete_check(true);
- } else {
- LOG_INFO("{}: remote checkpoint dir is not exist, should re-upload checkpoint "
- "dir, remote_checkpoint_dirname = {}",
- name,
- chkpt_dirname);
- complete_check(false);
- }
- } else if (resp.err == ERR_OBJECT_NOT_FOUND) {
- LOG_INFO("{}: remote checkpoint dir is not exist, should re-upload checkpoint dir, "
- "remote_checkpoint_dirname = {}",
- name,
- chkpt_dirname);
- complete_check(false);
- } else if (resp.err == ERR_TIMEOUT) {
- LOG_ERROR(
- "{}: block service list remote dir timeout, retry after 10s, dirname = {}",
- name,
- chkpt_dirname);
- add_ref();
-
- tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP,
- nullptr,
- [this, chkpt_dirname]() {
- if (!is_ready_for_check()) {
- LOG_INFO("{}: backup status has changed to {}, ignore "
- "checking backup on remote",
- name,
- cold_backup_status_to_string(status()));
- ignore_check();
- } else {
- remote_chkpt_dir_exist(chkpt_dirname);
- }
- release_ref();
- },
- 0,
- std::chrono::seconds(10));
- } else {
- LOG_ERROR("{}: block service list remote dir failed, dirname = {}, err = {}",
- name,
- chkpt_dirname,
- resp.err);
- fail_check("list remote dir failed");
- }
- release_ref();
- return;
- });
-}
-
-void cold_backup_context::upload_checkpoint_to_remote()
-{
- if (!is_ready_for_upload()) {
- LOG_INFO("{}: backup status has changed to {}, ignore upload checkpoint",
- name,
- cold_backup_status_to_string(status()));
- return;
- }
-
- bool old_status = false;
- // here, just allow one task to check upload status, and it will set _upload_status base on
- // the result it has checked; But, because of upload_checkpoint_to_remote maybe call multi-times
- // (for pause - uploading), so we use the atomic variant to implement
- if (!_have_check_upload_status.compare_exchange_strong(old_status, true)) {
- LOG_INFO("{}: upload status has already been checked, start upload checkpoint dir directly",
- name);
- on_upload_chkpt_dir();
- return;
- }
-
- // check whether cold_backup_metadata is exist and verify cold_backup_metadata if exist
- std::string metadata = cold_backup::get_remote_chkpt_meta_file(
- backup_root, request.app_name, request.pid, request.backup_id);
- dist::block_service::create_file_request req;
- req.file_name = metadata;
- req.ignore_metadata = false;
-
- add_ref();
-
- block_service->create_file(
- std::move(req),
- LPC_BACKGROUND_COLD_BACKUP,
- [this, metadata](const dist::block_service::create_file_response &resp) {
- if (resp.err == ERR_OK) {
- CHECK_NOTNULL(resp.file_handle, "");
- if (resp.file_handle->get_md5sum().empty() && resp.file_handle->get_size() <= 0) {
- _upload_status.store(UploadUncomplete);
- LOG_INFO("{}: check upload_status complete, cold_backup_metadata isn't exist, "
- "start upload checkpoint dir",
- name);
- on_upload_chkpt_dir();
- } else {
- LOG_INFO("{}: cold_backup_metadata is exist, read it's context", name);
- read_backup_metadata(resp.file_handle);
- }
- } else if (resp.err == ERR_TIMEOUT) {
- LOG_ERROR("{}: block service create file timeout, retry after 10s, file = {}",
- name,
- metadata);
- // when create backup_metadata timeout, should reset _have_check_upload_status
- // false to allow re-check
- _have_check_upload_status.store(false);
- add_ref();
-
- tasking::enqueue(
- LPC_BACKGROUND_COLD_BACKUP,
- nullptr,
- [this]() {
- if (!is_ready_for_upload()) {
- LOG_INFO(
- "{}: backup status has changed to {}, stop check upload status",
- name,
- cold_backup_status_to_string(status()));
- } else {
- upload_checkpoint_to_remote();
- }
- release_ref();
- },
- 0,
- std::chrono::seconds(10));
- } else {
- LOG_ERROR("{}: block service create file failed, file = {}, err = {}",
- name,
- metadata,
- resp.err);
- _have_check_upload_status.store(false);
- fail_upload("block service create file failed");
- }
- release_ref();
- return;
- });
-}
-
-void cold_backup_context::read_backup_metadata(
- const dist::block_service::block_file_ptr &file_handle)
-{
- dist::block_service::read_request req;
- req.remote_pos = 0;
- req.remote_length = -1;
-
- add_ref();
-
- file_handle->read(
- std::move(req),
- LPC_BACKGROUND_COLD_BACKUP,
- [this, file_handle](const dist::block_service::read_response &resp) {
- if (resp.err == ERR_OK) {
- LOG_INFO("{}: read cold_backup_metadata succeed, verify it's context, file = {}",
- name,
- file_handle->file_name());
- verify_backup_metadata(resp.buffer);
- on_upload_chkpt_dir();
- } else if (resp.err == ERR_TIMEOUT) {
- LOG_ERROR("{}: read remote file timeout, retry after 10s, file = {}",
- name,
- file_handle->file_name());
- add_ref();
-
- tasking::enqueue(
- LPC_BACKGROUND_COLD_BACKUP,
- nullptr,
- [this, file_handle] {
- if (!is_ready_for_upload()) {
- LOG_INFO(
- "{}: backup status has changed to {}, stop check upload status",
- name,
- cold_backup_status_to_string(status()));
- _have_check_upload_status.store(false);
- } else {
- read_backup_metadata(file_handle);
- }
- release_ref();
- },
- 0,
- std::chrono::seconds(10));
- } else {
- LOG_ERROR("{}: read remote file failed, file = {}, err = {}",
- name,
- file_handle->file_name(),
- resp.err);
- _have_check_upload_status.store(false);
- fail_upload("read remote file failed");
- }
- release_ref();
- return;
- });
-}
-
-void cold_backup_context::verify_backup_metadata(const blob &value)
-{
- cold_backup_metadata tmp;
- if (value.length() > 0 && json::json_forwarder<cold_backup_metadata>::decode(value, tmp)) {
- LOG_INFO("{}: check upload status complete, checkpoint dir uploading has already complete",
- name);
- _upload_status.store(UploadComplete);
- } else {
- LOG_INFO("{}: check upload status complete, checkpoint dir uploading isn't complete yet",
- name);
- _upload_status.store(UploadUncomplete);
- }
-}
-
-void cold_backup_context::on_upload_chkpt_dir()
-{
- if (_upload_status.load() == UploadInvalid || !is_ready_for_upload()) {
- LOG_INFO("{}: replica is not ready for uploading, ignore upload, cold_backup_status({})",
- name,
- cold_backup_status_to_string(status()));
- return;
- }
-
- if (_upload_status.load() == UploadComplete) {
- // TODO: if call upload_checkpint_to_remote multi times, maybe write_current_chkpt_file
- // multi times
- std::string chkpt_dirname = cold_backup::get_remote_chkpt_dirname();
- write_current_chkpt_file(chkpt_dirname);
- return;
- }
-
- prepare_upload();
-
- // prepare_upload maybe fail, so here check status
- if (!is_ready_for_upload()) {
- LOG_ERROR("{}: backup status has changed to {}, stop upload checkpoint dir",
- name,
- cold_backup_status_to_string(status()));
- return;
- }
-
- if (checkpoint_files.size() <= 0) {
- LOG_INFO("{}: checkpoint dir is empty, so upload is complete and just start write "
- "backup_metadata",
- name);
- bool old_status = false;
- // using atomic variant _have_write_backup_metadata is to allow one task to
- // write backup_metadata because on_upload_chkpt_dir maybe call multi-time
- if (_have_write_backup_metadata.compare_exchange_strong(old_status, true)) {
- write_backup_metadata();
- }
- } else {
- LOG_INFO("{}: start upload checkpoint dir, checkpoint dir = {}, total checkpoint file = {}",
- name,
- checkpoint_dir,
- checkpoint_files.size());
- std::vector<std::string> files;
- if (!upload_complete_or_fetch_uncomplete_files(files)) {
- for (auto &file : files) {
- LOG_INFO("{}: start upload checkpoint file to remote, file = {}", name, file);
- upload_file(file);
- }
- } else {
- LOG_INFO("{}: upload checkpoint dir to remote complete, total_file_cnt = {}",
- name,
- checkpoint_files.size());
- bool old_status = false;
- if (_have_write_backup_metadata.compare_exchange_strong(old_status, true)) {
- write_backup_metadata();
- }
- }
- }
-}
-
-void cold_backup_context::prepare_upload()
-{
- zauto_lock l(_lock);
- // only need initialize once
- if (_metadata.files.size() > 0) {
- return;
- }
- _file_remain_cnt = checkpoint_files.size();
-
- _metadata.checkpoint_decree = checkpoint_decree;
- _metadata.checkpoint_timestamp = checkpoint_timestamp;
- _metadata.checkpoint_total_size = checkpoint_file_total_size;
- for (int32_t idx = 0; idx < checkpoint_files.size(); idx++) {
- std::string &file = checkpoint_files[idx];
- file_meta f_meta;
- f_meta.name = file;
- std::string file_full_path = ::dsn::utils::filesystem::path_combine(checkpoint_dir, file);
- int64_t file_size = checkpoint_file_sizes[idx];
- std::string file_md5;
- if (::dsn::utils::filesystem::md5sum(file_full_path, file_md5) != ERR_OK) {
- LOG_ERROR("{}: get local file size or md5 fail, file = {}", name, file_full_path);
- fail_upload("compute local file size or md5 failed");
- return;
- }
- f_meta.md5 = file_md5;
- f_meta.size = file_size;
- _metadata.files.emplace_back(f_meta);
- _file_status.insert(std::make_pair(file, FileUploadUncomplete));
- _file_infos.insert(std::make_pair(file, std::make_pair(file_size, file_md5)));
- }
- _upload_file_size.store(0);
-}
-
-void cold_backup_context::upload_file(const std::string &local_filename)
-{
- std::string remote_chkpt_dir = cold_backup::get_remote_chkpt_dir(
- backup_root, request.app_name, request.pid, request.backup_id);
- dist::block_service::create_file_request req;
- req.file_name = ::dsn::utils::filesystem::path_combine(remote_chkpt_dir, local_filename);
- req.ignore_metadata = false;
-
- add_ref();
-
- block_service->create_file(
- std::move(req),
- LPC_BACKGROUND_COLD_BACKUP,
- [this, local_filename](const dist::block_service::create_file_response &resp) {
- if (resp.err == ERR_OK) {
- const dist::block_service::block_file_ptr &file_handle = resp.file_handle;
- CHECK_NOTNULL(file_handle, "");
- int64_t local_file_size = _file_infos.at(local_filename).first;
- std::string md5 = _file_infos.at(local_filename).second;
- std::string full_path_local_file =
- ::dsn::utils::filesystem::path_combine(checkpoint_dir, local_filename);
- if (md5 == file_handle->get_md5sum() &&
- local_file_size == file_handle->get_size()) {
- LOG_INFO("{}: checkpoint file already exist on remote, file = {}",
- name,
- full_path_local_file);
- on_upload_file_complete(local_filename);
- } else {
- LOG_INFO("{}: start upload checkpoint file to remote, file = {}",
- name,
- full_path_local_file);
- on_upload(file_handle, full_path_local_file);
- }
- } else if (resp.err == ERR_TIMEOUT) {
- LOG_ERROR("{}: block service create file timeout, retry after 10s, file = {}",
- name,
- local_filename);
- add_ref();
-
- tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP,
- nullptr,
- [this, local_filename]() {
- // TODO: status change from ColdBackupUploading to
- // ColdBackupPaused, and upload file timeout, but when callback
- // is executed it catches the status(ColdBackupPaused)
- // now, if status back to ColdBackupUploading very soon, and
- // call upload_checkpoint_to_remote() here,
- // upload_checkpoint_to_remote() maybe acquire the _lock first,
- // then stop give back file(upload timeout), the file is still
- // in uploading this file will not be uploaded until you call
- // upload_checkpoint_to_remote() after it's given back
- if (!is_ready_for_upload()) {
- std::string full_path_local_file =
- ::dsn::utils::filesystem::path_combine(checkpoint_dir,
- local_filename);
- LOG_INFO("{}: backup status has changed to {}, stop "
- "upload checkpoint file to remote, file = {}",
- name,
- cold_backup_status_to_string(status()),
- full_path_local_file);
- file_upload_uncomplete(local_filename);
- } else {
- upload_file(local_filename);
- }
- release_ref();
- },
- 0,
- std::chrono::seconds(10));
- } else {
- LOG_ERROR("{}: block service create file failed, file = {}, err = {}",
- name,
- local_filename,
- resp.err);
- fail_upload("create file failed");
- }
- if (resp.err != ERR_OK && _owner_replica != nullptr) {
- METRIC_INCREMENT(*_owner_replica, backup_file_upload_failed_count);
- }
- release_ref();
- return;
- });
-}
-
-void cold_backup_context::on_upload(const dist::block_service::block_file_ptr &file_handle,
- const std::string &full_path_local_file)
-{
- dist::block_service::upload_request req;
- req.input_local_name = full_path_local_file;
-
- add_ref();
-
- file_handle->upload(
- std::move(req),
- LPC_BACKGROUND_COLD_BACKUP,
- [this, file_handle, full_path_local_file](
- const dist::block_service::upload_response &resp) {
- if (resp.err == ERR_OK) {
- std::string local_filename =
- ::dsn::utils::filesystem::get_file_name(full_path_local_file);
- CHECK_EQ(_file_infos.at(local_filename).first,
- static_cast<int64_t>(resp.uploaded_size));
- LOG_INFO(
- "{}: upload checkpoint file complete, file = {}", name, full_path_local_file);
- on_upload_file_complete(local_filename);
- } else if (resp.err == ERR_TIMEOUT) {
- LOG_ERROR("{}: upload checkpoint file timeout, retry after 10s, file = {}",
- name,
- full_path_local_file);
- add_ref();
-
- tasking::enqueue(
- LPC_BACKGROUND_COLD_BACKUP,
- nullptr,
- [this, file_handle, full_path_local_file]() {
- if (!is_ready_for_upload()) {
- LOG_ERROR("{}: backup status has changed to {}, stop upload "
- "checkpoint file to remote, file = {}",
- name,
- cold_backup_status_to_string(status()),
- full_path_local_file);
- std::string local_filename =
- ::dsn::utils::filesystem::get_file_name(full_path_local_file);
- file_upload_uncomplete(local_filename);
- } else {
- on_upload(file_handle, full_path_local_file);
- }
- release_ref();
- },
- 0,
- std::chrono::seconds(10));
- } else {
- LOG_ERROR("{}: upload checkpoint file to remote failed, file = {}, err = {}",
- name,
- full_path_local_file,
- resp.err);
- fail_upload("upload checkpoint file to remote failed");
- }
- if (resp.err != ERR_OK && _owner_replica != nullptr) {
- METRIC_INCREMENT(*_owner_replica, backup_file_upload_failed_count);
- }
- release_ref();
- return;
- });
-}
-
-void cold_backup_context::write_backup_metadata()
-{
- if (_upload_status.load() == UploadComplete) {
- LOG_INFO("{}: upload have already done, no need write metadata again", name);
- return;
- }
- std::string metadata = cold_backup::get_remote_chkpt_meta_file(
- backup_root, request.app_name, request.pid, request.backup_id);
- dist::block_service::create_file_request req;
- req.file_name = metadata;
- req.ignore_metadata = true;
-
- add_ref();
-
- block_service->create_file(
- std::move(req),
- LPC_BACKGROUND_COLD_BACKUP,
- [this, metadata](const dist::block_service::create_file_response &resp) {
- if (resp.err == ERR_OK) {
- CHECK_NOTNULL(resp.file_handle, "");
- blob buffer = json::json_forwarder<cold_backup_metadata>::encode(_metadata);
- // hold itself until callback is executed
- add_ref();
- LOG_INFO("{}: create backup metadata file succeed, start to write file, file = {}",
- name,
- metadata);
- this->on_write(resp.file_handle, buffer, [this](bool succeed) {
- if (succeed) {
- std::string chkpt_dirname = cold_backup::get_remote_chkpt_dirname();
- _upload_status.store(UploadComplete);
- LOG_INFO(
- "{}: write backup metadata complete, write current checkpoint file",
- name);
- write_current_chkpt_file(chkpt_dirname);
- }
- // NOTICE: write file fail will internal error be processed in on_write()
- release_ref();
- });
- } else if (resp.err == ERR_TIMEOUT) {
- LOG_ERROR("{}: block service create file timeout, retry after 10s, file = {}",
- name,
- metadata);
- add_ref();
-
- tasking::enqueue(
- LPC_BACKGROUND_COLD_BACKUP,
- nullptr,
- [this]() {
- if (!is_ready_for_upload()) {
- _have_write_backup_metadata.store(false);
- LOG_ERROR(
- "{}: backup status has changed to {}, stop write backup_metadata",
- name,
- cold_backup_status_to_string(status()));
- } else {
- write_backup_metadata();
- }
- release_ref();
- },
- 0,
- std::chrono::seconds(10));
- } else {
- LOG_ERROR("{}: block service create file failed, file = {}, err = {}",
- name,
- metadata,
- resp.err);
- _have_write_backup_metadata.store(false);
- fail_upload("create file failed");
- }
- release_ref();
- return;
- });
-}
-
-void cold_backup_context::write_current_chkpt_file(const std::string &value)
-{
- // before we write current checkpoint file, we can release the memory occupied by _metadata,
- // _file_status and _file_infos, because even if write current checkpoint file failed, the
- // backup_metadata is uploading succeed, so we will not re-upload
- _metadata.files.clear();
- _file_infos.clear();
- _file_status.clear();
-
- if (!is_ready_for_upload()) {
- LOG_INFO("{}: backup status has changed to {}, stop write current checkpoint file",
- name,
- cold_backup_status_to_string(status()));
- return;
- }
-
- std::string current_chkpt_file = cold_backup::get_current_chkpt_file(
- backup_root, request.app_name, request.pid, request.backup_id);
- dist::block_service::create_file_request req;
- req.file_name = current_chkpt_file;
- req.ignore_metadata = true;
-
- add_ref();
-
- block_service->create_file(
- std::move(req),
- LPC_BACKGROUND_COLD_BACKUP,
- [this, value, current_chkpt_file](const dist::block_service::create_file_response &resp) {
- if (resp.err == ERR_OK) {
- CHECK_NOTNULL(resp.file_handle, "");
- auto len = value.length();
- std::shared_ptr<char> buf = utils::make_shared_array<char>(len);
- ::memcpy(buf.get(), value.c_str(), len);
- blob write_buf(std::move(buf), static_cast<unsigned int>(len));
- LOG_INFO("{}: create current checkpoint file succeed, start write file ,file = {}",
- name,
- current_chkpt_file);
- add_ref();
- this->on_write(resp.file_handle, write_buf, [this](bool succeed) {
- if (succeed) {
- complete_upload();
- }
- release_ref();
- });
- } else if (resp.err == ERR_TIMEOUT) {
- LOG_ERROR("{}: block file create file timeout, retry after 10s, file = {}",
- name,
- current_chkpt_file);
- add_ref();
-
- tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP,
- nullptr,
- [this, value]() {
- if (!is_ready_for_upload()) {
- LOG_INFO("{}: backup status has changed to {}, stop write "
- "current checkpoint file",
- name,
- cold_backup_status_to_string(status()));
- } else {
- write_current_chkpt_file(value);
- }
-
- release_ref();
- },
- 0,
- std::chrono::seconds(10));
- } else {
- LOG_ERROR("{}: block service create file failed, file = {}, err = {}",
- name,
- current_chkpt_file,
- resp.err);
- fail_upload("create file failed");
- }
- release_ref();
- return;
- });
-}
-
-void cold_backup_context::on_write(const dist::block_service::block_file_ptr &file_handle,
- const blob &value,
- const std::function<void(bool)> &callback)
-{
- CHECK_NOTNULL(file_handle, "");
- dist::block_service::write_request req;
- req.buffer = value;
-
- add_ref();
-
- file_handle->write(
- std::move(req),
- LPC_BACKGROUND_COLD_BACKUP,
- [this, value, file_handle, callback](const dist::block_service::write_response &resp) {
- if (resp.err == ERR_OK) {
- LOG_INFO(
- "{}: write remote file succeed, file = {}", name, file_handle->file_name());
- callback(true);
- } else if (resp.err == ERR_TIMEOUT) {
- LOG_INFO("{}: write remote file timeout, retry after 10s, file = {}",
- name,
- file_handle->file_name());
- add_ref();
-
- tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP,
- nullptr,
- [this, file_handle, value, callback]() {
- if (!is_ready_for_upload()) {
- LOG_INFO("{}: backup status has changed to {}, stop write "
- "remote file, file = {}",
- name,
- cold_backup_status_to_string(status()),
- file_handle->file_name());
- } else {
- on_write(file_handle, value, callback);
- }
- release_ref();
- },
- 0,
- std::chrono::seconds(10));
- } else {
- // here, must call the callback to release_ref
- callback(false);
- LOG_ERROR("{}: write remote file failed, file = {}, err = {}",
- name,
- file_handle->file_name(),
- resp.err);
- fail_upload("write remote file failed");
- }
- release_ref();
- return;
- });
-}
-
-void cold_backup_context::on_upload_file_complete(const std::string &local_filename)
-{
- const int64_t &f_size = _file_infos.at(local_filename).first;
- _upload_file_size.fetch_add(f_size);
- file_upload_complete(local_filename);
- if (_owner_replica != nullptr) {
- METRIC_INCREMENT(*_owner_replica, backup_file_upload_successful_count);
- METRIC_INCREMENT_BY(*_owner_replica, backup_file_upload_total_bytes, f_size);
- }
- // update progress
- // int a = 10; int b = 3; then b/a = 0;
- // double a = 10; double b = 3; then b/a = 0.3
- auto total = static_cast<double>(checkpoint_file_total_size);
- auto complete_size = static_cast<double>(_upload_file_size.load());
-
- if (total <= complete_size) {
- LOG_INFO("{}: upload checkpoint to remote complete, checkpoint dir = {}, total file size "
- "= {}, file count = {}",
- name,
- checkpoint_dir,
- total,
- checkpoint_files.size());
- bool old_status = false;
- if (_have_write_backup_metadata.compare_exchange_strong(old_status, true)) {
- write_backup_metadata();
- }
- return;
- } else {
- CHECK_GT(total, 0.0);
- update_progress(static_cast<int>(complete_size / total * 1000));
- LOG_INFO("{}: the progress of upload checkpoint is {}", name, _progress.load());
- }
- if (is_ready_for_upload()) {
- std::vector<std::string> upload_files;
- upload_complete_or_fetch_uncomplete_files(upload_files);
- for (auto &file : upload_files) {
- LOG_INFO("{}: start upload checkpoint file to remote, file = {}", name, file);
- upload_file(file);
- }
- }
-}
-
-bool cold_backup_context::upload_complete_or_fetch_uncomplete_files(std::vector<std::string> &files)
-{
- bool upload_complete = false;
-
- zauto_lock l(_lock);
- if (_file_remain_cnt > 0 && _cur_upload_file_cnt < _max_concurrent_uploading_file_cnt) {
- for (const auto &_pair : _file_status) {
- if (_pair.second == file_status::FileUploadUncomplete) {
- files.emplace_back(_pair.first);
- _file_remain_cnt -= 1;
- _file_status[_pair.first] = file_status::FileUploading;
- _cur_upload_file_cnt += 1;
- }
- if (_file_remain_cnt <= 0 ||
- _cur_upload_file_cnt >= _max_concurrent_uploading_file_cnt) {
- break;
- }
- }
- }
- if (_file_remain_cnt <= 0 && _cur_upload_file_cnt <= 0) {
- upload_complete = true;
- }
- return upload_complete;
-}
-
-void cold_backup_context::file_upload_uncomplete(const std::string &filename)
-{
- zauto_lock l(_lock);
-
- CHECK_GE(_cur_upload_file_cnt, 1);
- _cur_upload_file_cnt -= 1;
- _file_remain_cnt += 1;
- _file_status[filename] = file_status::FileUploadUncomplete;
-}
-
-void cold_backup_context::file_upload_complete(const std::string &filename)
-{
- zauto_lock l(_lock);
-
- CHECK_GE(_cur_upload_file_cnt, 1);
- _cur_upload_file_cnt -= 1;
- _file_status[filename] = file_status::FileUploadComplete;
-}
-
-} // namespace replication
-} // namespace dsn
diff --git a/src/replica/backup/cold_backup_context.h b/src/replica/backup/cold_backup_context.h
deleted file mode 100644
index 0d2f07f..0000000
--- a/src/replica/backup/cold_backup_context.h
+++ /dev/null
@@ -1,390 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <inttypes.h>
-#include <stdio.h>
-#include <string.h>
-#include <atomic>
-#include <functional>
-#include <map>
-#include <string>
-#include <utility>
-#include <vector>
-
-#include "backup_types.h"
-#include "block_service/block_service.h"
-#include "common/backup_common.h"
-#include "common/gpid.h"
-#include "common/json_helper.h"
-#include "common/replication_other_types.h"
-#include "metadata_types.h"
-#include "utils/autoref_ptr.h"
-#include "utils/fmt_logging.h"
-#include "utils/zlocks.h"
-
-class replication_service_test_app;
-
-namespace dsn {
-class blob;
-
-namespace replication {
-
-class replica;
-
-//
-// ColdBackupInvalid
-// |
-// V
-// |<------ ColdBackupChecking ---------------------------------->|
-// | | |
-// | V |
-// | ColdBackupChecked ----------------------------------->|
-// | | |
-// | V |
-// ColdBackupCompleted <---| ColdBackupCheckpointing ----------------------------->|
-// | | | |
-// | | V |--->
-// ColdBackupCanceled
-// | | ColdBackupCheckpointed ------------------------------>|
-// | | | |
-// | | V |
-// | |<------ ColdBackupUploading <======> ColdBackupPaused ------>|
-// | | | |
-// | |____________________________| |
-// | | |
-// | V |
-// | ColdBackupFailed -------------------->|
-// | |
-// |---------------------------------------------------------------------------->|
-//
-enum cold_backup_status
-{
- ColdBackupInvalid = 0,
- ColdBackupChecking,
- ColdBackupChecked,
- ColdBackupCheckpointing,
- ColdBackupCheckpointed,
- ColdBackupUploading,
- ColdBackupPaused,
- ColdBackupCanceled,
- ColdBackupCompleted,
- ColdBackupFailed
-};
-const char *cold_backup_status_to_string(cold_backup_status status);
-
-struct cold_backup_metadata
-{
- int64_t checkpoint_decree;
- int64_t checkpoint_timestamp;
- std::vector<file_meta> files;
- int64_t checkpoint_total_size;
- DEFINE_JSON_SERIALIZATION(checkpoint_decree, checkpoint_timestamp, files, checkpoint_total_size)
-};
-
-//
-// the process of uploading the checkpoint directory to block filesystem:
-// 1, upload all the file of the checkpoint to block filesystem
-// 2, write a cold_backup_metadata to block filesystem(which includes all the file's name, size
-// and md5 and so on)
-// 3, write a current_checkpoint file to block filesystem, which is used to mark which
-// checkpoint is invalid
-//
-
-//
-// the process of check whether uploading is finished on block filesystem:
-// 1, check whether the current checkpoint file exist, if exist continue, otherwise not finish
-// 2, read the context of the current checkpoint file, the context of this file is the valid
-// checkpoint dirname on block filesystem
-// 3, verify whether the checkpoint dirname is exist, if exist uploading is already finished,
-// otherwise uploading is not finished
-//
-
-class cold_backup_context : public ref_counter
-{
-public:
- explicit cold_backup_context(replica *r_,
- const backup_request &request_,
- int max_upload_file_cnt)
- : request(request_),
- block_service(nullptr),
- checkpoint_decree(0),
- checkpoint_timestamp(0),
- durable_decree_when_checkpoint(-1),
- checkpoint_file_total_size(0),
- _status(ColdBackupInvalid),
- _progress(0),
- _upload_file_size(0),
- _have_check_upload_status(false),
- _have_write_backup_metadata(false),
- _upload_status(UploadInvalid),
- _max_concurrent_uploading_file_cnt(max_upload_file_cnt),
- _cur_upload_file_cnt(0),
- _file_remain_cnt(0),
- _owner_replica(r_),
- _start_time_ms(0)
- {
- sprintf(name,
- "backup{%d.%d.%s.%" PRId64 "}",
- request.pid.get_app_id(),
- request.pid.get_partition_index(),
- request.policy.policy_name.c_str(),
- request.backup_id);
- memset(_reason, 0, sizeof(_reason));
- }
-
- ~cold_backup_context() {}
-
- // cancel backup.
- // {*} --> ColdBackupCanceled
- //
- // Will be called in replication thread.
- void cancel();
-
- // start checking backup on remote.
- // ColdBackupInvalid --> ColdBackupChecking
- // Returns:
- // - true if status is successfully changed to ColdBackupChecking.
- bool start_check();
-
- // ignore checking backup on remote and switch backward status.
- // ColdBackupChecking --> ColdBackupInvalid
- // Returns:
- // - true if status is successfully changed to ColdBackupInvalid.
- bool ignore_check()
- {
- int checking = ColdBackupChecking;
- return _status.compare_exchange_strong(checking, ColdBackupInvalid);
- }
-
- // mark failed when checking backup on remote.
- // ColdBackupChecking --> ColdBackupFailed
- // Returns:
- // - true if status is successfully changed to ColdBackupFailed.
- bool fail_check(const char *failure_reason);
-
- // complete checking backup on remote.
- // ColdBackupChecking --> { ColdBackupChecked | ColdBackupCompleted }
- // Returns:
- // - true if status is successfully changed to ColdBackupChecked or ColdBackupCompleted.
- bool complete_check(bool uploaded);
-
- // start generating checkpoint.
- // ColdBackupChecked --> ColdBackupCheckpointing
- // Returns:
- // - true if status is successfully changed to ColdBackupCheckpointing.
- bool start_checkpoint();
-
- // ignore generating checkpoint and switch backward status.
- // ColdBackupCheckpointing --> ColdBackupChecked
- // Returns:
- // - true if status is successfully changed to ColdBackupChecked.
- bool ignore_checkpoint()
- {
- int checkpointing = ColdBackupCheckpointing;
- return _status.compare_exchange_strong(checkpointing, ColdBackupChecked);
- }
-
- // mark failed when generating checkpoint.
- // ColdBackupCheckpointing --> ColdBackupFailed
- // Returns:
- // - true if status is successfully changed to ColdBackupFailed.
- bool fail_checkpoint(const char *failure_reason);
-
- // complete generating checkpoint.
- // ColdBackupCheckpointing --> ColdBackupCheckpointed
- // Returns:
- // - true if status is successfully changed to ColdBackupCheckpointed.
- bool complete_checkpoint();
-
- // start uploading checkpoint to remote.
- // { ColdBackupCheckpointed | ColdBackupPaused } --> ColdBackupUploading
- //
- // Will be called in replication thread.
- // Returns:
- // - true if status is successfully changed to ColdBackupUploading.
- bool start_upload()
- {
- int checkpointed = ColdBackupCheckpointed;
- int paused = ColdBackupPaused;
- return _status.compare_exchange_strong(checkpointed, ColdBackupUploading) ||
- _status.compare_exchange_strong(paused, ColdBackupUploading);
- }
-
- // mark failed when uploading checkpoint to remote.
- // { ColdBackupUploading | ColdBackupPaused } --> ColdBackupFailed
- // Returns:
- // - true if status is successfully changed to ColdBackupFailed.
- bool fail_upload(const char *failure_reason);
-
- // complete uploading checkpoint to remote.
- // { ColdBackupUploading | ColdBackupPaused } --> ColdBackupCompleted
- // Returns:
- // - true if status is successfully changed to ColdBackupCompleted.
- bool complete_upload();
-
- // update progress.
- // Progress should be in range of [0, 1000].
- void update_progress(int progress)
- {
- CHECK(progress >= 0 && progress <= cold_backup_constant::PROGRESS_FINISHED,
- "invalid progress {}",
- progress);
- _progress.store(progress);
- }
-
- // check if it is ready for checking.
- bool is_ready_for_check() const { return _status.load() == ColdBackupChecking; }
-
- // check if it is ready for checkpointing.
- bool is_checkpointing() const { return _status.load() == ColdBackupCheckpointing; }
-
- // check if it is ready for uploading.
- bool is_ready_for_upload() const { return _status.load() == ColdBackupUploading; }
-
- // get current status.
- cold_backup_status status() const { return (cold_backup_status)_status.load(); }
-
- // get current progress.
- int progress() const { return _progress.load(); }
-
- // get failure reason.
- const char *reason() const { return _reason; }
-
- // check if backup is aleady exist on remote.
- // Preconditions:
- // - name/request are set
- // - checkpoint_dir/checkpoint_decree/checkpoint_files are not set
- // - status is one of { ColdBackupChecking, ColdBackupCanceled }
- // Will be called in background thread.
- void check_backup_on_remote();
-
- // upload backup checkpoint to remote.
- // Preconditions:
- // - name/request are set
- // - checkpoint_dir/checkpoint_decree/checkpoint_files are set
- // - status is one of { ColdBackupUploading, ColdBackupPaused, ColdBackupCanceled }
- // Will be called in background thread.
- void upload_checkpoint_to_remote();
-
- uint64_t get_start_time_ms() { return _start_time_ms; }
-
- uint64_t get_upload_file_size() { return _upload_file_size.load(); }
-
- int64_t get_checkpoint_total_size() { return checkpoint_file_total_size; }
-
-private:
- void read_current_chkpt_file(const dist::block_service::block_file_ptr &file_handle);
- void remote_chkpt_dir_exist(const std::string &chkpt_dirname);
-
- void read_backup_metadata(const dist::block_service::block_file_ptr &file_handle);
- // value is a json string, verify it's validity
- // validity means uploading checkpoint directory complete, so just write_current_chkpt_file
- // otherwise, upload checkpoint directory
- void verify_backup_metadata(const blob &value);
- // after upload_checkpoint_directory ---> write_backup_metadata --> write_current_chkpt_file -->
- // notify meta
- void write_backup_metadata();
-
- void write_current_chkpt_file(const std::string &value);
- // write value to file, if succeed then callback(true), else callback(false)
- void on_write(const dist::block_service::block_file_ptr &file_handle,
- const blob &value,
- const std::function<void(bool)> &callback);
- void prepare_upload();
- void on_upload_chkpt_dir();
- void upload_file(const std::string &local_filename);
- void on_upload(const dist::block_service::block_file_ptr &file_handle,
- const std::string &full_path_local_file);
- void on_upload_file_complete(const std::string &local_filename);
-
- // functions access the structure protected by _lock
- // return:
- // -- true, uploading is complete
- // -- false, uploading is not complete; and put uncomplete file into 'files'
- bool upload_complete_or_fetch_uncomplete_files(std::vector<std::string> &files);
- void file_upload_uncomplete(const std::string &filename);
- void file_upload_complete(const std::string &filename);
-
-public:
- /// the following variables are public, and will only be set once, and will not be changed once
- /// set.
- char name[256]; // backup{<app_id>.<partition_index>.<policy_name>.<backup_id>}
- // all logging should print the name
- backup_request request;
- dist::block_service::block_filesystem *block_service;
- std::string backup_root;
- decree checkpoint_decree;
- int64_t checkpoint_timestamp;
- decree durable_decree_when_checkpoint;
- std::string checkpoint_dir;
- std::vector<std::string> checkpoint_files;
- std::vector<int64_t> checkpoint_file_sizes;
- int64_t checkpoint_file_total_size;
-
-private:
- friend class ::replication_service_test_app;
-
- /// state variables
- std::atomic_int _status;
- std::atomic_int _progress; // [0,1000], 1000 means completed
- char _reason[1024]; // failure reason
-
- std::atomic_llong _upload_file_size;
- // TODO: if chechpoint directory has many files, cold_backup_metadata may
- // occupy large amount of memory
- // for example, if a single file occupy 32B, then 1,000,000 files may occupy 32MB
- cold_backup_metadata _metadata;
-
- enum upload_status
- {
- UploadInvalid = 0,
- UploadUncomplete,
- UploadComplete
- };
- enum file_status
- {
- FileUploadUncomplete = 0,
- FileUploading,
- FileUploadComplete
- };
-
- // two atomic variants is to ensure check_upload_status and write_backup_metadata just be
- // executed once
- std::atomic_bool _have_check_upload_status;
- std::atomic_bool _have_write_backup_metadata;
-
- std::atomic_int _upload_status;
-
- int32_t _max_concurrent_uploading_file_cnt;
- // filename -> <filesize, md5>
- std::map<std::string, std::pair<int64_t, std::string>> _file_infos;
-
- zlock _lock; // lock the structure below
- std::map<std::string, file_status> _file_status;
- int32_t _cur_upload_file_cnt;
- int32_t _file_remain_cnt;
-
- replica *_owner_replica;
- uint64_t _start_time_ms;
-};
-
-typedef dsn::ref_ptr<cold_backup_context> cold_backup_context_ptr;
-
-} // namespace replication
-} // namespace dsn
diff --git a/src/replica/backup/replica_backup_manager.cpp b/src/replica/backup/replica_backup_manager.cpp
index 2fb70b7..9b354a6 100644
--- a/src/replica/backup/replica_backup_manager.cpp
+++ b/src/replica/backup/replica_backup_manager.cpp
@@ -18,31 +18,10 @@
#include "replica_backup_manager.h"
#include <absl/strings/string_view.h>
-#include <stdint.h>
-#include <algorithm>
-#include <chrono>
-#include <map>
-#include <memory>
-#include <utility>
-#include <vector>
-#include "backup_types.h"
-#include "cold_backup_context.h"
-#include "common/gpid.h"
-#include "common/replication.codes.h"
-#include "dsn.layer2_types.h"
-#include "metadata_types.h"
#include "replica/replica.h"
-#include "replica/replica_context.h"
-#include "replica/replication_app_base.h"
-#include "runtime/api_layer1.h"
-#include "runtime/task/async_calls.h"
-#include "utils/autoref_ptr.h"
-#include "utils/filesystem.h"
#include "utils/flags.h"
-#include "utils/fmt_logging.h"
-#include "utils/strings.h"
-#include "utils/thread_access_checker.h"
+#include "utils/metrics.h"
METRIC_DEFINE_gauge_int64(replica,
backup_running_count,
@@ -65,179 +44,11 @@
namespace dsn {
namespace replication {
-// returns true if this checkpoint dir belongs to the policy
-static bool is_policy_checkpoint(const std::string &chkpt_dirname, const std::string &policy_name)
-{
- std::vector<std::string> strs;
- utils::split_args(chkpt_dirname.c_str(), strs, '.');
- // backup_tmp.<policy_name>.* or backup.<policy_name>.*
- return strs.size() >= 2 &&
- (strs[0] == std::string("backup_tmp") || strs[0] == std::string("backup")) &&
- strs[1] == policy_name;
-}
+// TODO(heyuchen): implement it
-// get all backup checkpoint dirs which belong to the policy
-static bool get_policy_checkpoint_dirs(const std::string &dir,
- const std::string &policy,
- /*out*/ std::vector<std::string> &chkpt_dirs)
-{
- chkpt_dirs.clear();
- // list sub dirs
- std::vector<std::string> sub_dirs;
- if (!utils::filesystem::get_subdirectories(dir, sub_dirs, false)) {
- LOG_ERROR("list sub dirs of dir {} failed", dir.c_str());
- return false;
- }
+replica_backup_manager::replica_backup_manager(replica *r) : replica_base(r), _replica(r) {}
- for (std::string &d : sub_dirs) {
- std::string dirname = utils::filesystem::get_file_name(d);
- if (is_policy_checkpoint(dirname, policy)) {
- chkpt_dirs.push_back(std::move(dirname));
- }
- }
- return true;
-}
-
-replica_backup_manager::replica_backup_manager(replica *r)
- : replica_base(r),
- _replica(r),
- METRIC_VAR_INIT_replica(backup_running_count),
- METRIC_VAR_INIT_replica(backup_max_duration_ms),
- METRIC_VAR_INIT_replica(backup_file_upload_max_bytes)
-{
-}
-
-replica_backup_manager::~replica_backup_manager()
-{
- if (_collect_info_timer != nullptr) {
- _collect_info_timer->cancel(true);
- }
-}
-
-void replica_backup_manager::on_clear_cold_backup(const backup_clear_request &request)
-{
- _replica->_checker.only_one_thread_access();
-
- auto find = _replica->_cold_backup_contexts.find(request.policy_name);
- if (find != _replica->_cold_backup_contexts.end()) {
- cold_backup_context_ptr backup_context = find->second;
- if (backup_context->is_checkpointing()) {
- LOG_INFO_PREFIX(
- "{}: delay clearing obsoleted cold backup context, cause backup_status == "
- "ColdBackupCheckpointing",
- backup_context->name);
- tasking::enqueue(LPC_REPLICATION_COLD_BACKUP,
- &_replica->_tracker,
- [this, request]() { on_clear_cold_backup(request); },
- get_gpid().thread_hash(),
- std::chrono::seconds(100));
- return;
- }
-
- _replica->_cold_backup_contexts.erase(request.policy_name);
- }
-
- background_clear_backup_checkpoint(request.policy_name);
-}
-
-void replica_backup_manager::start_collect_backup_info()
-{
- if (_collect_info_timer == nullptr) {
- _collect_info_timer =
- tasking::enqueue_timer(LPC_PER_REPLICA_COLLECT_INFO_TIMER,
- &_replica->_tracker,
- [this]() { collect_backup_info(); },
- std::chrono::milliseconds(FLAGS_gc_interval_ms),
- get_gpid().thread_hash());
- }
-}
-
-void replica_backup_manager::collect_backup_info()
-{
- uint64_t cold_backup_running_count = 0;
- uint64_t cold_backup_max_duration_time_ms = 0;
- uint64_t cold_backup_max_upload_file_size = 0;
- uint64_t now_ms = dsn_now_ms();
-
- // collect backup info from all of the cold backup contexts
- for (const auto &p : _replica->_cold_backup_contexts) {
- const cold_backup_context_ptr &backup_context = p.second;
- cold_backup_status backup_status = backup_context->status();
- if (_replica->status() == partition_status::type::PS_PRIMARY) {
- if (backup_status > ColdBackupInvalid && backup_status < ColdBackupCanceled) {
- cold_backup_running_count++;
- }
- } else if (_replica->status() == partition_status::type::PS_SECONDARY) {
- // secondary end backup with status ColdBackupCheckpointed
- if (backup_status > ColdBackupInvalid && backup_status < ColdBackupCheckpointed) {
- cold_backup_running_count++;
- }
- }
-
- if (backup_status == ColdBackupUploading) {
- cold_backup_max_duration_time_ms = std::max(
- cold_backup_max_duration_time_ms, now_ms - backup_context->get_start_time_ms());
- cold_backup_max_upload_file_size =
- std::max(cold_backup_max_upload_file_size, backup_context->get_upload_file_size());
- }
- }
-
- METRIC_VAR_SET(backup_running_count, cold_backup_running_count);
- METRIC_VAR_SET(backup_max_duration_ms, cold_backup_max_duration_time_ms);
- METRIC_VAR_SET(backup_file_upload_max_bytes, cold_backup_max_upload_file_size);
-}
-
-void replica_backup_manager::background_clear_backup_checkpoint(const std::string &policy_name)
-{
- LOG_INFO_PREFIX("schedule to clear all checkpoint dirs of policy({}) after {} minutes",
- policy_name,
- FLAGS_cold_backup_checkpoint_reserve_minutes);
- tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP,
- &_replica->_tracker,
- [this, policy_name]() { clear_backup_checkpoint(policy_name); },
- get_gpid().thread_hash(),
- std::chrono::minutes(FLAGS_cold_backup_checkpoint_reserve_minutes));
-}
-
-// clear all checkpoint dirs of the policy
-void replica_backup_manager::clear_backup_checkpoint(const std::string &policy_name)
-{
- LOG_INFO_PREFIX("clear all checkpoint dirs of policy({})", policy_name);
- auto backup_dir = _replica->_app->backup_dir();
- if (!utils::filesystem::directory_exists(backup_dir)) {
- return;
- }
-
- // Find the corresponding checkpoint dirs with policy name
- std::vector<std::string> chkpt_dirs;
- if (!get_policy_checkpoint_dirs(backup_dir, policy_name, chkpt_dirs)) {
- LOG_WARNING_PREFIX("get checkpoint dirs in backup dir({}) failed", backup_dir);
- return;
- }
-
- // Remove these checkpoint dirs
- for (const std::string &dirname : chkpt_dirs) {
- std::string full_path = utils::filesystem::path_combine(backup_dir, dirname);
- if (utils::filesystem::remove_path(full_path)) {
- LOG_INFO_PREFIX("remove backup checkpoint dir({}) succeed", full_path);
- } else {
- LOG_WARNING_PREFIX("remove backup checkpoint dir({}) failed", full_path);
- }
- }
-}
-
-void replica_backup_manager::send_clear_request_to_secondaries(const gpid &pid,
- const std::string &policy_name)
-{
- backup_clear_request request;
- request.__set_pid(pid);
- request.__set_policy_name(policy_name);
-
- for (const auto &target_address : _replica->_primary_states.membership.secondaries) {
- rpc::call_one_way_typed(
- target_address, RPC_CLEAR_COLD_BACKUP, request, get_gpid().thread_hash());
- }
-}
+replica_backup_manager::~replica_backup_manager() {}
} // namespace replication
} // namespace dsn
diff --git a/src/replica/backup/replica_backup_manager.h b/src/replica/backup/replica_backup_manager.h
index 40d0055..aa941b3 100644
--- a/src/replica/backup/replica_backup_manager.h
+++ b/src/replica/backup/replica_backup_manager.h
@@ -17,18 +17,14 @@
#pragma once
-#include <string>
-
#include "replica/replica_base.h"
-#include "runtime/task/task.h"
-#include "utils/metrics.h"
namespace dsn {
-class gpid;
namespace replication {
-class backup_clear_request;
+// TODO(heyuchen): implement it
+
class replica;
class replica_backup_manager : replica_base
@@ -37,24 +33,11 @@
explicit replica_backup_manager(replica *r);
~replica_backup_manager();
- void on_clear_cold_backup(const backup_clear_request &request);
- void start_collect_backup_info();
-
private:
friend class replica;
friend class replica_backup_manager_test;
- void clear_backup_checkpoint(const std::string &policy_name);
- void send_clear_request_to_secondaries(const gpid &pid, const std::string &policy_name);
- void background_clear_backup_checkpoint(const std::string &policy_name);
- void collect_backup_info();
-
replica *_replica;
- dsn::task_ptr _collect_info_timer;
-
- METRIC_VAR_DECLARE_gauge_int64(backup_running_count);
- METRIC_VAR_DECLARE_gauge_int64(backup_max_duration_ms);
- METRIC_VAR_DECLARE_gauge_int64(backup_file_upload_max_bytes);
};
} // namespace replication
diff --git a/src/replica/backup/replica_backup_server.cpp b/src/replica/backup/replica_backup_server.cpp
deleted file mode 100644
index ef7ddc4..0000000
--- a/src/replica/backup/replica_backup_server.cpp
+++ /dev/null
@@ -1,109 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "replica_backup_server.h"
-
-#include <string>
-
-#include "backup_types.h"
-#include "common/gpid.h"
-#include "common/replication.codes.h"
-#include "replica/replica.h"
-#include "replica/replica_stub.h"
-#include "replica_backup_manager.h"
-#include "runtime/api_layer1.h"
-#include "runtime/rpc/serialization.h"
-#include "utils/autoref_ptr.h"
-#include "utils/error_code.h"
-#include "utils/flags.h"
-#include "utils/fmt_logging.h"
-#include "utils/strings.h"
-
-DSN_DECLARE_string(cold_backup_root);
-
-namespace dsn {
-class message_ex;
-
-namespace replication {
-
-replica_backup_server::replica_backup_server(const replica_stub *rs) : _stub(rs)
-{
- dsn_rpc_register_handler(RPC_COLD_BACKUP, "cold_backup", [this](message_ex *msg) {
- on_cold_backup(backup_rpc::auto_reply(msg));
- });
- dsn_rpc_register_handler(RPC_CLEAR_COLD_BACKUP, "clear_cold_backup", [this](message_ex *msg) {
- backup_clear_request clear_req;
- unmarshall(msg, clear_req);
- on_clear_cold_backup(clear_req);
- });
-}
-
-replica_backup_server::~replica_backup_server()
-{
- dsn_rpc_unregiser_handler(RPC_COLD_BACKUP);
- dsn_rpc_unregiser_handler(RPC_CLEAR_COLD_BACKUP);
-}
-
-void replica_backup_server::on_cold_backup(backup_rpc rpc)
-{
- const backup_request &request = rpc.request();
- backup_response &response = rpc.response();
-
- LOG_INFO("received cold backup request: backup[{}.{}.{}]",
- request.pid,
- request.policy.policy_name,
- request.backup_id);
- response.pid = request.pid;
- response.policy_name = request.policy.policy_name;
- response.backup_id = request.backup_id;
-
- if (utils::is_empty(FLAGS_cold_backup_root)) {
- LOG_ERROR(
- "backup[{}.{}.{}]: FLAGS_cold_backup_root is empty, response ERR_OPERATION_DISABLED",
- request.pid,
- request.policy.policy_name,
- request.backup_id);
- response.err = ERR_OPERATION_DISABLED;
- return;
- }
-
- replica_ptr rep = _stub->get_replica(request.pid);
- if (rep != nullptr) {
- rep->on_cold_backup(request, response);
- } else {
- LOG_ERROR("backup[{}.{}.{}]: replica not found, response ERR_OBJECT_NOT_FOUND",
- request.pid,
- request.policy.policy_name,
- request.backup_id);
- response.err = ERR_OBJECT_NOT_FOUND;
- }
-}
-
-void replica_backup_server::on_clear_cold_backup(const backup_clear_request &request)
-{
- LOG_INFO("receive clear cold backup request: backup({}.{})",
- request.pid,
- request.policy_name.c_str());
-
- replica_ptr rep = _stub->get_replica(request.pid);
- if (rep != nullptr) {
- rep->get_backup_manager()->on_clear_cold_backup(request);
- }
-}
-
-} // namespace replication
-} // namespace dsn
diff --git a/src/replica/backup/replica_backup_server.h b/src/replica/backup/replica_backup_server.h
deleted file mode 100644
index 7d30f78..0000000
--- a/src/replica/backup/replica_backup_server.h
+++ /dev/null
@@ -1,45 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include "common/backup_common.h"
-
-namespace dsn {
-namespace replication {
-
-class backup_clear_request;
-class replica_stub;
-
-// A server distributes the cold-backup task to the targeted replica.
-class replica_backup_server
-{
-public:
- explicit replica_backup_server(const replica_stub *rs);
- ~replica_backup_server();
-
-private:
- void on_cold_backup(backup_rpc rpc);
-
- void on_clear_cold_backup(const backup_clear_request &request);
-
-private:
- const replica_stub *_stub;
-};
-
-} // namespace replication
-} // namespace dsn
diff --git a/src/replica/backup/test/replica_backup_manager_test.cpp b/src/replica/backup/test/replica_backup_manager_test.cpp
index ba9b5be..546fb5c 100644
--- a/src/replica/backup/test/replica_backup_manager_test.cpp
+++ b/src/replica/backup/test/replica_backup_manager_test.cpp
@@ -15,42 +15,16 @@
// specific language governing permissions and limitations
// under the License.
-#include <memory>
-#include <string>
-
-#include "gtest/gtest.h"
-#include "replica/backup/replica_backup_manager.h"
-#include "replica/replication_app_base.h"
-#include "replica/test/mock_utils.h"
#include "replica/test/replica_test_base.h"
-#include "utils/filesystem.h"
namespace dsn {
namespace replication {
+// TODO(heyuchen): implement it
class replica_backup_manager_test : public replica_test_base
{
public:
- void clear_backup_checkpoint(const std::string &policy_name)
- {
- _replica->get_backup_manager()->clear_backup_checkpoint(policy_name);
- }
};
-INSTANTIATE_TEST_SUITE_P(, replica_backup_manager_test, ::testing::Values(false, true));
-
-TEST_P(replica_backup_manager_test, clear_cold_backup)
-{
- std::string policy_name = "test_policy";
-
- // create policy dir: <backup_dir>/backup.<policy_name>.*
- std::string policy_dir = _replica->get_app()->backup_dir() + "/backup." + policy_name;
- utils::filesystem::create_directory(policy_dir);
-
- // clear policy dir
- clear_backup_checkpoint(policy_name);
- ASSERT_FALSE(utils::filesystem::directory_exists(policy_dir));
-}
-
} // namespace replication
} // namespace dsn
diff --git a/src/replica/duplication/test/duplication_test_base.h b/src/replica/duplication/test/duplication_test_base.h
index eb914f3..690e46b 100644
--- a/src/replica/duplication/test/duplication_test_base.h
+++ b/src/replica/duplication/test/duplication_test_base.h
@@ -22,6 +22,7 @@
#include "replica/duplication/replica_duplicator.h"
#include "replica/duplication/replica_duplicator_manager.h"
#include "replica/duplication/duplication_sync_timer.h"
+#include "common/replication.codes.h"
namespace dsn {
namespace replication {
diff --git a/src/replica/replica.h b/src/replica/replica.h
index d94871f..7261f78 100644
--- a/src/replica/replica.h
+++ b/src/replica/replica.h
@@ -34,7 +34,9 @@
#include <memory>
#include <string>
#include <utility>
+#include <vector>
+#include "common/json_helper.h"
#include "common/replication_other_types.h"
#include "dsn.layer2_types.h"
#include "meta_admin_types.h"
@@ -43,7 +45,6 @@
#include "mutation_log.h"
#include "prepare_list.h"
#include "ranger/access_type.h"
-#include "replica/backup/cold_backup_context.h"
#include "replica/replica_base.h"
#include "replica_context.h"
#include "runtime/api_layer1.h"
@@ -80,8 +81,6 @@
} // namespace security
namespace replication {
-class backup_request;
-class backup_response;
class configuration_restore_request;
class detect_hotkey_request;
class detect_hotkey_response;
@@ -104,8 +103,6 @@
class replication_options;
struct dir_node;
-typedef dsn::ref_ptr<cold_backup_context> cold_backup_context_ptr;
-
namespace test {
class test_checker;
}
@@ -132,6 +129,16 @@
const std::string &name,
/*out*/ bool &value);
+// TODO(heyuchen): refactor it
+struct cold_backup_metadata
+{
+ int64_t checkpoint_decree;
+ int64_t checkpoint_timestamp;
+ std::vector<file_meta> files;
+ int64_t checkpoint_total_size;
+ DEFINE_JSON_SERIALIZATION(checkpoint_decree, checkpoint_timestamp, files, checkpoint_total_size)
+};
+
struct deny_client
{
bool read{false};
@@ -188,7 +195,6 @@
void on_config_sync(const app_info &info,
const partition_configuration &config,
split_status::type meta_split_status);
- void on_cold_backup(const backup_request &request, /*out*/ backup_response &response);
//
// messages from peers (primary or secondary)
@@ -428,17 +434,6 @@
const std::string &chk_dir);
/////////////////////////////////////////////////////////////////
- // cold backup
- virtual void generate_backup_checkpoint(cold_backup_context_ptr backup_context);
- void trigger_async_checkpoint_for_backup(cold_backup_context_ptr backup_context);
- void wait_async_checkpoint_for_backup(cold_backup_context_ptr backup_context);
- void local_create_backup_checkpoint(cold_backup_context_ptr backup_context);
- void send_backup_request_to_secondary(const backup_request &request);
- // set all cold_backup_state cancel/pause
- void set_backup_context_cancel();
- void clear_cold_backup_state();
-
- /////////////////////////////////////////////////////////////////
// replica restore from backup
bool read_cold_backup_metadata(const std::string &fname, cold_backup_metadata &backup_metadata);
// checkpoint on cold backup media maybe contain useless file,
@@ -591,8 +586,6 @@
primary_context _primary_states;
secondary_context _secondary_states;
potential_secondary_context _potential_secondary_states;
- // policy_name --> cold_backup_context
- std::map<std::string, cold_backup_context_ptr> _cold_backup_contexts;
partition_split_context _split_states;
// record the progress of restore
diff --git a/src/replica/replica_backup.cpp b/src/replica/replica_backup.cpp
deleted file mode 100644
index 427eabc..0000000
--- a/src/replica/replica_backup.cpp
+++ /dev/null
@@ -1,752 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <boost/cstdint.hpp>
-// IWYU pragma: no_include <boost/detail/basic_pointerbuf.hpp>
-#include <boost/lexical_cast.hpp>
-// IWYU pragma: no_include <ext/alloc_traits.h>
-#include <inttypes.h>
-#include <stdio.h>
-#include <algorithm>
-#include <chrono>
-#include <cstdint>
-#include <ios>
-#include <map>
-#include <memory>
-#include <string>
-#include <type_traits>
-#include <utility>
-#include <vector>
-
-#include "backup/cold_backup_context.h"
-#include "backup/replica_backup_manager.h"
-#include "backup_types.h"
-#include "block_service/block_service_manager.h"
-#include "common/gpid.h"
-#include "common/replication.codes.h"
-#include "common/replication_enums.h"
-#include "common/replication_other_types.h"
-#include "dsn.layer2_types.h"
-#include "metadata_types.h"
-#include "replica.h"
-#include "replica/replica_context.h"
-#include "replica/replication_app_base.h"
-#include "replica_stub.h"
-#include "runtime/api_layer1.h"
-#include "runtime/task/async_calls.h"
-#include "utils/autoref_ptr.h"
-#include "utils/env.h"
-#include "utils/error_code.h"
-#include "utils/filesystem.h"
-#include "utils/flags.h"
-#include "utils/fmt_logging.h"
-#include "utils/metrics.h"
-#include "utils/strings.h"
-#include "utils/thread_access_checker.h"
-#include "utils/time_utils.h"
-
-DSN_DEFINE_uint64(replication,
- max_concurrent_uploading_file_count,
- 10,
- "concurrent uploading file count to block service");
-
-DSN_DECLARE_string(cold_backup_root);
-
-namespace dsn {
-namespace dist {
-namespace block_service {
-class block_filesystem;
-} // namespace block_service
-} // namespace dist
-
-namespace replication {
-void replica::on_cold_backup(const backup_request &request, /*out*/ backup_response &response)
-{
- _checker.only_one_thread_access();
-
- const std::string &policy_name = request.policy.policy_name;
- auto backup_id = request.backup_id;
- cold_backup_context_ptr new_context(
- new cold_backup_context(this, request, FLAGS_max_concurrent_uploading_file_count));
-
- LOG_INFO_PREFIX("{}: received cold backup request, partition_status = {}",
- new_context->name,
- enum_to_string(status()));
-
- if (status() == partition_status::type::PS_PRIMARY ||
- status() == partition_status::type::PS_SECONDARY) {
- cold_backup_context_ptr backup_context = nullptr;
- auto find = _cold_backup_contexts.find(policy_name);
- if (find != _cold_backup_contexts.end()) {
- backup_context = find->second;
- } else {
- /// TODO: policy may change provider
- dist::block_service::block_filesystem *block_service =
- _stub->_block_service_manager.get_or_create_block_filesystem(
- request.policy.backup_provider_type);
- if (block_service == nullptr) {
- LOG_ERROR(
- "{}: create cold backup block service failed, provider_type = {}, response "
- "ERR_INVALID_PARAMETERS",
- new_context->name,
- request.policy.backup_provider_type);
- response.err = ERR_INVALID_PARAMETERS;
- return;
- }
- auto r = _cold_backup_contexts.insert(std::make_pair(policy_name, new_context));
- CHECK(r.second, "");
- backup_context = r.first->second;
- backup_context->block_service = block_service;
- backup_context->backup_root = request.__isset.backup_path
- ? dsn::utils::filesystem::path_combine(
- request.backup_path, FLAGS_cold_backup_root)
- : FLAGS_cold_backup_root;
- }
-
- CHECK_EQ_PREFIX(backup_context->request.policy.policy_name, policy_name);
- cold_backup_status backup_status = backup_context->status();
-
- if (backup_context->request.backup_id < backup_id || backup_status == ColdBackupCanceled) {
- if (backup_status == ColdBackupCheckpointing) {
- LOG_INFO("{}: delay clearing obsoleted cold backup context, cause backup_status == "
- "ColdBackupCheckpointing",
- new_context->name);
- tasking::enqueue(LPC_REPLICATION_COLD_BACKUP,
- &_tracker,
- [this, request]() {
- backup_response response;
- on_cold_backup(request, response);
- },
- get_gpid().thread_hash(),
- std::chrono::seconds(100));
- } else {
- // TODO(wutao1): deleting cold backup context should be
- // extracted as a function like try_delete_cold_backup_context;
- // clear obsoleted backup context firstly
- LOG_INFO("{}: clear obsoleted cold backup context, old_backup_id = {}, "
- "old_backup_status = {}",
- new_context->name,
- backup_context->request.backup_id,
- cold_backup_status_to_string(backup_status));
- backup_context->cancel();
- _cold_backup_contexts.erase(policy_name);
- // go to another round
- on_cold_backup(request, response);
- }
- return;
- }
-
- if (backup_context->request.backup_id > backup_id) {
- // backup_id is outdated
- LOG_ERROR("{}: request outdated cold backup, current_backup_id = {}, response "
- "ERR_VERSION_OUTDATED",
- new_context->name,
- backup_context->request.backup_id);
- response.err = ERR_VERSION_OUTDATED;
- return;
- }
-
- // for secondary, request is already filtered by primary, so if
- // request is repeated, so generate_backup_checkpoint is already running, we do
- // nothing;
- // request is new, we should call generate_backup_checkpoint;
-
- // TODO: if secondary's status have changed, how to process the _cold_backup_state,
- // and how to process the backup_status, cancel/pause
- if (status() == partition_status::PS_SECONDARY) {
- if (backup_status == ColdBackupInvalid) {
- // new backup_request, should set status to ColdBackupChecked to allow secondary
- // can start to checkpoint
- backup_context->start_check();
- backup_context->complete_check(false);
- if (backup_context->start_checkpoint()) {
- METRIC_VAR_INCREMENT(backup_started_count);
- tasking::enqueue(
- LPC_BACKGROUND_COLD_BACKUP, &_tracker, [this, backup_context]() {
- generate_backup_checkpoint(backup_context);
- });
- }
- }
- return;
- }
-
- send_backup_request_to_secondary(request);
-
- if (backup_status == ColdBackupChecking || backup_status == ColdBackupCheckpointing ||
- backup_status == ColdBackupUploading) {
- // do nothing
- LOG_INFO("{}: backup is busy, status = {}, progress = {}, response ERR_BUSY",
- backup_context->name,
- cold_backup_status_to_string(backup_status),
- backup_context->progress());
- response.err = ERR_BUSY;
- } else if (backup_status == ColdBackupInvalid && backup_context->start_check()) {
- METRIC_VAR_INCREMENT(backup_started_count);
- LOG_INFO("{}: start checking backup on remote, response ERR_BUSY",
- backup_context->name);
- tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP, nullptr, [backup_context]() {
- backup_context->check_backup_on_remote();
- });
- response.err = ERR_BUSY;
- } else if (backup_status == ColdBackupChecked && backup_context->start_checkpoint()) {
- // start generating checkpoint
- LOG_INFO("{}: start generating checkpoint, response ERR_BUSY", backup_context->name);
- tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP, &_tracker, [this, backup_context]() {
- generate_backup_checkpoint(backup_context);
- });
- response.err = ERR_BUSY;
- } else if ((backup_status == ColdBackupCheckpointed || backup_status == ColdBackupPaused) &&
- backup_context->start_upload()) {
- // start uploading checkpoint
- LOG_INFO("{}: start uploading checkpoint, response ERR_BUSY", backup_context->name);
- tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP, nullptr, [backup_context]() {
- backup_context->upload_checkpoint_to_remote();
- });
- response.err = ERR_BUSY;
- } else if (backup_status == ColdBackupFailed) {
- LOG_ERROR("{}: upload checkpoint failed, reason = {}, response ERR_LOCAL_APP_FAILURE",
- backup_context->name,
- backup_context->reason());
- response.err = ERR_LOCAL_APP_FAILURE;
- backup_context->cancel();
- _cold_backup_contexts.erase(policy_name);
- } else if (backup_status == ColdBackupCompleted) {
- LOG_INFO("{}: upload checkpoint completed, response ERR_OK", backup_context->name);
- _backup_mgr->send_clear_request_to_secondaries(backup_context->request.pid,
- policy_name);
-
- // clear local checkpoint dirs in background thread
- _backup_mgr->background_clear_backup_checkpoint(policy_name);
- response.err = ERR_OK;
- } else {
- LOG_WARNING(
- "{}: unhandled case, handle_status = {}, real_time_status = {}, response ERR_BUSY",
- backup_context->name,
- cold_backup_status_to_string(backup_status),
- cold_backup_status_to_string(backup_context->status()));
- response.err = ERR_BUSY;
- }
-
- response.progress = backup_context->progress();
- response.checkpoint_total_size = backup_context->get_checkpoint_total_size();
- LOG_INFO("{}: backup progress is {}", backup_context->name, response.progress);
- } else {
- LOG_ERROR(
- "{}: invalid state for cold backup, partition_status = {}, response ERR_INVALID_STATE",
- new_context->name,
- enum_to_string(status()));
- response.err = ERR_INVALID_STATE;
- }
-}
-
-void replica::send_backup_request_to_secondary(const backup_request &request)
-{
- for (const auto &target_address : _primary_states.membership.secondaries) {
- // primary will send backup_request to secondary periodically
- // so, we shouldn't handle the response
- rpc::call_one_way_typed(target_address, RPC_COLD_BACKUP, request, get_gpid().thread_hash());
- }
-}
-
-// backup/backup.<policy_name>.<backup_id>.<decree>.<timestamp>
-static std::string backup_get_dir_name(const std::string &policy_name,
- int64_t backup_id,
- int64_t decree,
- int64_t timestamp)
-{
- char buffer[256];
- sprintf(buffer,
- "backup.%s.%" PRId64 ".%" PRId64 ".%" PRId64 "",
- policy_name.c_str(),
- backup_id,
- decree,
- timestamp);
- return std::string(buffer);
-}
-
-// backup/backup_tmp.<policy_name>.<backup_id>.<timestamp>
-static std::string
-backup_get_tmp_dir_name(const std::string &policy_name, int64_t backup_id, int64_t timestamp)
-{
- char buffer[256];
- sprintf(
- buffer, "backup_tmp.%s.%" PRId64 ".%" PRId64 "", policy_name.c_str(), backup_id, timestamp);
- return std::string(buffer);
-}
-
-// returns:
-// 0 : not related
-// 1 : related (belong to this policy but not belong to this backup_context)
-// 2 : valid (belong to this policy and belong to this backup_context)
-static int is_related_or_valid_checkpoint(const std::string &chkpt_dirname,
- const cold_backup_context_ptr &backup_context)
-{
- std::vector<std::string> strs;
- ::dsn::utils::split_args(chkpt_dirname.c_str(), strs, '.');
- if (strs.size() == 4 && strs[0] == std::string("backup_tmp") &&
- strs[1] == backup_context->request.policy.policy_name) {
- // backup_tmp.<policy_name>.<backup_id>.<timestamp>
- // refer to backup_get_tmp_dir_name().
- int64_t backup_id = boost::lexical_cast<int64_t>(strs[2]);
- if (backup_id < backup_context->request.backup_id) {
- // it belongs to old backup_context, we can remove it safely.
- return 1;
- }
- } else if (strs.size() == 5 && strs[0] == std::string("backup_tmp") &&
- strs[4] == std::string("tmp") &&
- strs[1] == backup_context->request.policy.policy_name) {
- // backup_tmp.<policy_name>.<backup_id>.<timestamp>.tmp
- // refer to CheckpointImpl::CreateCheckpointQuick().
- int64_t backup_id = boost::lexical_cast<int64_t>(strs[2]);
- if (backup_id < backup_context->request.backup_id) {
- // it belongs to old backup_context, we can remove it safely.
- return 1;
- }
- } else if (strs.size() == 5 && strs[0] == std::string("backup") &&
- strs[1] == backup_context->request.policy.policy_name) {
- // backup.<policy_name>.<backup_id>.<decree>.<timestamp>
- // refer to backup_get_dir_name().
- int64_t backup_id = boost::lexical_cast<int64_t>(strs[2]);
- // here, we only need policy_name and backup_id to verify whether chkpt_dirname belong
- // to this backup_context.
- if (backup_id == backup_context->request.backup_id) {
- // it belongs to this backup_context.
- return 2;
- } else if (backup_id < backup_context->request.backup_id) {
- // it belongs to old backup_context, we can remove it safely.
- return 1;
- }
- } else {
- // unknown dir, ignore it
- LOG_WARNING("{}: found a invalid checkpoint dir({})", backup_context->name, chkpt_dirname);
- }
- return 0;
-}
-
-// filter backup checkpoint under 'dir'
-// - find the valid backup checkpoint dir if exist
-// - find all the backup checkpoint belong to this policy, mainly obsolete backup checkpoint
-static bool filter_checkpoint(const std::string &dir,
- const cold_backup_context_ptr &backup_context,
- /*out*/ std::vector<std::string> &related_chkpt_dirs,
- /*out*/ std::string &valid_chkpt_dir)
-{
- valid_chkpt_dir.clear();
- related_chkpt_dirs.clear();
- // list sub dirs
- std::vector<std::string> sub_dirs;
- if (!utils::filesystem::get_subdirectories(dir, sub_dirs, false)) {
- LOG_ERROR("{}: list sub dirs of dir {} failed", backup_context->name, dir);
- return false;
- }
-
- for (std::string &d : sub_dirs) {
- std::string dirname = utils::filesystem::get_file_name(d);
- int ret = is_related_or_valid_checkpoint(dirname, backup_context);
- if (ret == 1) {
- related_chkpt_dirs.emplace_back(std::move(dirname));
- } else if (ret == 2) {
- CHECK(valid_chkpt_dir.empty(),
- "{}: there are two valid backup checkpoint dir, {} VS {}",
- backup_context->name,
- valid_chkpt_dir,
- dirname);
- valid_chkpt_dir = dirname;
- }
- }
- return true;
-}
-
-static bool
-statistic_file_infos_under_dir(const std::string &dir,
- /*out*/ std::vector<std::pair<std::string, int64_t>> &file_infos,
- /*out*/ int64_t &total_size)
-{
- std::vector<std::string> sub_files;
- if (!utils::filesystem::get_subfiles(dir, sub_files, false)) {
- LOG_ERROR("list sub files of dir {} failed", dir);
- return false;
- }
-
- total_size = 0;
- file_infos.clear();
-
- // TODO(yingchun): check if there are any files that are not sensitive (not encrypted).
- for (std::string &file : sub_files) {
- std::pair<std::string, int64_t> file_info;
-
- if (!utils::filesystem::file_size(
- file, dsn::utils::FileDataType::kSensitive, file_info.second)) {
- LOG_ERROR("get file size of {} failed", file);
- return false;
- }
- file_info.first = utils::filesystem::get_file_name(file);
- total_size += file_info.second;
-
- file_infos.emplace_back(std::move(file_info));
- }
- return true;
-}
-
-static bool backup_parse_dir_name(const char *name,
- std::string &policy_name,
- int64_t &backup_id,
- int64_t &decree,
- int64_t ×tamp)
-{
- std::vector<std::string> strs;
- ::dsn::utils::split_args(name, strs, '.');
- if (strs.size() < 5) {
- return false;
- } else {
- policy_name = strs[1];
- backup_id = boost::lexical_cast<int64_t>(strs[2]);
- decree = boost::lexical_cast<int64_t>(strs[3]);
- timestamp = boost::lexical_cast<int64_t>(strs[4]);
- return (std::string(name) ==
- backup_get_dir_name(policy_name, backup_id, decree, timestamp));
- }
-}
-
-// run in REPLICATION_LONG thread
-// Effection:
-// - may ignore_checkpoint() if in invalid status
-// - may fail_checkpoint() if some error occurs
-// - may complete_checkpoint() and schedule on_cold_backup() if backup checkpoint dir is already
-// exist
-// - may schedule trigger_async_checkpoint_for_backup() if backup checkpoint dir is not exist
-void replica::generate_backup_checkpoint(cold_backup_context_ptr backup_context)
-{
- if (backup_context->status() != ColdBackupCheckpointing) {
- LOG_INFO("{}: ignore generating backup checkpoint because backup_status = {}",
- backup_context->name,
- cold_backup_status_to_string(backup_context->status()));
- backup_context->ignore_checkpoint();
- return;
- }
-
- // prepare back dir
- auto backup_dir = _app->backup_dir();
- if (!utils::filesystem::directory_exists(backup_dir) &&
- !utils::filesystem::create_directory(backup_dir)) {
- LOG_ERROR("{}: create backup dir {} failed", backup_context->name, backup_dir);
- backup_context->fail_checkpoint("create backup dir failed");
- return;
- }
-
- std::vector<std::string> related_backup_chkpt_dirname;
- std::string valid_backup_chkpt_dirname;
- if (!filter_checkpoint(
- backup_dir, backup_context, related_backup_chkpt_dirname, valid_backup_chkpt_dirname)) {
- // encounter some error, just return
- backup_context->fail_checkpoint("list sub backup dir failed");
- return;
- }
- if (!valid_backup_chkpt_dirname.empty()) {
- std::vector<std::pair<std::string, int64_t>> file_infos;
- int64_t total_size = 0;
- std::string valid_chkpt_full_path =
- utils::filesystem::path_combine(backup_dir, valid_backup_chkpt_dirname);
- // parse checkpoint dirname
- std::string policy_name;
- int64_t backup_id = 0, decree = 0, timestamp = 0;
- CHECK(backup_parse_dir_name(
- valid_backup_chkpt_dirname.c_str(), policy_name, backup_id, decree, timestamp),
- "{}: valid chekpoint dirname {}",
- backup_context->name,
- valid_backup_chkpt_dirname);
-
- if (statistic_file_infos_under_dir(valid_chkpt_full_path, file_infos, total_size)) {
- backup_context->checkpoint_decree = decree;
- backup_context->checkpoint_timestamp = timestamp;
- backup_context->checkpoint_dir = valid_chkpt_full_path;
- for (std::pair<std::string, int64_t> &p : file_infos) {
- backup_context->checkpoint_files.emplace_back(std::move(p.first));
- backup_context->checkpoint_file_sizes.emplace_back(std::move(p.second));
- }
- backup_context->checkpoint_file_total_size = total_size;
- backup_context->complete_checkpoint();
-
- LOG_INFO(
- "{}: backup checkpoint aleady exist, dir = {}, file_count = {}, total_size = {}",
- backup_context->name,
- backup_context->checkpoint_dir,
- file_infos.size(),
- total_size);
- // TODO: in primary, this will make the request send to secondary again
- tasking::enqueue(LPC_REPLICATION_COLD_BACKUP,
- &_tracker,
- [this, backup_context]() {
- backup_response response;
- on_cold_backup(backup_context->request, response);
- },
- get_gpid().thread_hash());
- } else {
- backup_context->fail_checkpoint("statistic file info under checkpoint failed");
- return;
- }
- } else {
- LOG_INFO("{}: backup checkpoint not exist, start to trigger async checkpoint",
- backup_context->name);
- tasking::enqueue(
- LPC_REPLICATION_COLD_BACKUP,
- &_tracker,
- [this, backup_context]() { trigger_async_checkpoint_for_backup(backup_context); },
- get_gpid().thread_hash());
- }
-
- // clear related but not valid checkpoint
- for (const std::string &dirname : related_backup_chkpt_dirname) {
- std::string full_path = utils::filesystem::path_combine(backup_dir, dirname);
- LOG_INFO("{}: found obsolete backup checkpoint dir({}), remove it",
- backup_context->name,
- full_path);
- if (!utils::filesystem::remove_path(full_path)) {
- LOG_WARNING("{}: remove obsolete backup checkpoint dir({}) failed",
- backup_context->name,
- full_path);
- }
- }
-}
-
-// run in REPLICATION thread
-// Effection:
-// - may ignore_checkpoint() if in invalid status
-// - may fail_checkpoint() if some error occurs
-// - may trigger async checkpoint and invoke wait_async_checkpoint_for_backup()
-void replica::trigger_async_checkpoint_for_backup(cold_backup_context_ptr backup_context)
-{
- _checker.only_one_thread_access();
-
- if (backup_context->status() != ColdBackupCheckpointing) {
- LOG_INFO("{}: ignore triggering async checkpoint because backup_status = {}",
- backup_context->name,
- cold_backup_status_to_string(backup_context->status()));
- backup_context->ignore_checkpoint();
- return;
- }
-
- if (status() != partition_status::PS_PRIMARY && status() != partition_status::PS_SECONDARY) {
- LOG_INFO("{}: ignore triggering async checkpoint because partition_status = {}",
- backup_context->name,
- enum_to_string(status()));
- backup_context->ignore_checkpoint();
- return;
- }
-
- decree durable_decree = last_durable_decree();
- if (backup_context->checkpoint_decree > 0 &&
- durable_decree >= backup_context->checkpoint_decree) {
- // checkpoint done
- } else if (backup_context->checkpoint_decree > 0 &&
- backup_context->durable_decree_when_checkpoint == durable_decree) {
- // already triggered, just wait
- char time_buf[20];
- dsn::utils::time_ms_to_date_time(backup_context->checkpoint_timestamp, time_buf, 20);
- LOG_INFO("{}: do not trigger async checkpoint because it is already triggered, "
- "checkpoint_decree = {}, checkpoint_timestamp = {} ({}), "
- "durable_decree_when_checkpoint = {}",
- backup_context->name,
- backup_context->checkpoint_decree,
- backup_context->checkpoint_timestamp,
- time_buf,
- backup_context->durable_decree_when_checkpoint);
- } else { // backup_context->checkpoint_decree == 0 ||
- // backup_context->durable_decree_when_checkpoint != durable_decree
- if (backup_context->checkpoint_decree == 0) {
- // first trigger
- backup_context->checkpoint_decree = last_committed_decree();
- } else { // backup_context->durable_decree_when_checkpoint != durable_decree
- // checkpoint generated, but is behind checkpoint_decree, need trigger again
- CHECK_LT(backup_context->durable_decree_when_checkpoint, durable_decree);
- LOG_INFO("{}: need trigger async checkpoint again", backup_context->name);
- }
- backup_context->checkpoint_timestamp = dsn_now_ms();
- backup_context->durable_decree_when_checkpoint = durable_decree;
- char time_buf[20];
- dsn::utils::time_ms_to_date_time(backup_context->checkpoint_timestamp, time_buf, 20);
- LOG_INFO("{}: trigger async checkpoint, "
- "checkpoint_decree = {}, checkpoint_timestamp = {} ({}), "
- "durable_decree_when_checkpoint = {}",
- backup_context->name,
- backup_context->checkpoint_decree,
- backup_context->checkpoint_timestamp,
- time_buf,
- backup_context->durable_decree_when_checkpoint);
- init_checkpoint(true);
- }
-
- // after triggering init_checkpoint, we just wait until it finish
- wait_async_checkpoint_for_backup(backup_context);
-}
-
-// run in REPLICATION thread
-// Effection:
-// - may ignore_checkpoint() if in invalid status
-// - may delay some time and schedule trigger_async_checkpoint_for_backup() if async checkpoint not
-// completed
-// - may schedule local_create_backup_checkpoint if async checkpoint completed
-void replica::wait_async_checkpoint_for_backup(cold_backup_context_ptr backup_context)
-{
- _checker.only_one_thread_access();
-
- if (backup_context->status() != ColdBackupCheckpointing) {
- LOG_INFO("{}: ignore waiting async checkpoint because backup_status = {}",
- backup_context->name,
- cold_backup_status_to_string(backup_context->status()));
- backup_context->ignore_checkpoint();
- return;
- }
-
- if (status() != partition_status::PS_PRIMARY && status() != partition_status::PS_SECONDARY) {
- LOG_INFO("{}: ignore waiting async checkpoint because partition_status = {}",
- backup_context->name,
- enum_to_string(status()));
- backup_context->ignore_checkpoint();
- return;
- }
-
- decree du = last_durable_decree();
- if (du < backup_context->checkpoint_decree) {
- LOG_INFO("{}: async checkpoint not done, we just wait it done, "
- "last_durable_decree = {}, backup_checkpoint_decree = {}",
- backup_context->name,
- du,
- backup_context->checkpoint_decree);
- tasking::enqueue(
- LPC_REPLICATION_COLD_BACKUP,
- &_tracker,
- [this, backup_context]() { trigger_async_checkpoint_for_backup(backup_context); },
- get_gpid().thread_hash(),
- std::chrono::seconds(10));
- } else {
- LOG_INFO("{}: async checkpoint done, last_durable_decree = {}, "
- "backup_context->checkpoint_decree = {}",
- backup_context->name,
- du,
- backup_context->checkpoint_decree);
- tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP, &_tracker, [this, backup_context]() {
- local_create_backup_checkpoint(backup_context);
- });
- }
-}
-
-// run in REPLICATION_LONG thread
-// Effection:
-// - may ignore_checkpoint() if in invalid status
-// - may fail_checkpoint() if some error occurs
-// - may complete_checkpoint() and schedule on_cold_backup() if checkpoint dir is successfully
-// copied
-void replica::local_create_backup_checkpoint(cold_backup_context_ptr backup_context)
-{
- if (backup_context->status() != ColdBackupCheckpointing) {
- LOG_INFO("{}: ignore generating backup checkpoint because backup_status = {}",
- backup_context->name,
- cold_backup_status_to_string(backup_context->status()));
- backup_context->ignore_checkpoint();
- return;
- }
-
- // the real checkpoint decree may be larger than backup_context->checkpoint_decree,
- // so we need copy checkpoint to backup_checkpoint_tmp_dir_path, and then rename it.
- std::string backup_checkpoint_tmp_dir_path = utils::filesystem::path_combine(
- _app->backup_dir(),
- backup_get_tmp_dir_name(backup_context->request.policy.policy_name,
- backup_context->request.backup_id,
- backup_context->checkpoint_timestamp));
- int64_t last_decree = 0;
- dsn::error_code err =
- _app->copy_checkpoint_to_dir(backup_checkpoint_tmp_dir_path.c_str(), &last_decree);
- if (err != ERR_OK) {
- // try local_create_backup_checkpoint 10s later
- LOG_INFO("{}: create backup checkpoint failed with err = {}, try call "
- "local_create_backup_checkpoint 10s later",
- backup_context->name,
- err);
- utils::filesystem::remove_path(backup_checkpoint_tmp_dir_path);
- tasking::enqueue(
- LPC_BACKGROUND_COLD_BACKUP,
- &_tracker,
- [this, backup_context]() { local_create_backup_checkpoint(backup_context); },
- 0,
- std::chrono::seconds(10));
- } else {
- CHECK_GE(last_decree, backup_context->checkpoint_decree);
- backup_context->checkpoint_decree = last_decree; // update to real decree
- std::string backup_checkpoint_dir_path = utils::filesystem::path_combine(
- _app->backup_dir(),
- backup_get_dir_name(backup_context->request.policy.policy_name,
- backup_context->request.backup_id,
- backup_context->checkpoint_decree,
- backup_context->checkpoint_timestamp));
- if (!utils::filesystem::rename_path(backup_checkpoint_tmp_dir_path,
- backup_checkpoint_dir_path)) {
- LOG_ERROR("{}: rename checkpoint dir({}) to dir({}) failed",
- backup_context->name,
- backup_checkpoint_tmp_dir_path,
- backup_checkpoint_dir_path);
- utils::filesystem::remove_path(backup_checkpoint_tmp_dir_path);
- utils::filesystem::remove_path(backup_checkpoint_dir_path);
- backup_context->fail_checkpoint("rename checkpoint dir failed");
- return;
- }
-
- std::vector<std::pair<std::string, int64_t>> file_infos;
- int64_t total_size = 0;
- if (!statistic_file_infos_under_dir(backup_checkpoint_dir_path, file_infos, total_size)) {
- LOG_ERROR("{}: statistic file info under dir({}) failed",
- backup_context->name,
- backup_checkpoint_dir_path);
- backup_context->fail_checkpoint("statistic file info under dir failed");
- return;
- }
-
- LOG_INFO(
- "{}: generate backup checkpoint succeed, dir = {}, file_count = {}, total_size = {}",
- backup_context->name,
- backup_checkpoint_dir_path,
- file_infos.size(),
- total_size);
- backup_context->checkpoint_dir = backup_checkpoint_dir_path;
- for (std::pair<std::string, int64_t> &pair : file_infos) {
- backup_context->checkpoint_files.emplace_back(std::move(pair.first));
- backup_context->checkpoint_file_sizes.emplace_back(std::move(pair.second));
- }
- backup_context->checkpoint_file_total_size = total_size;
- backup_context->complete_checkpoint();
- tasking::enqueue(LPC_REPLICATION_COLD_BACKUP,
- &_tracker,
- [this, backup_context]() {
- backup_response response;
- on_cold_backup(backup_context->request, response);
- },
- get_gpid().thread_hash());
- }
-}
-
-void replica::set_backup_context_cancel()
-{
- for (auto &pair : _cold_backup_contexts) {
- pair.second->cancel();
- LOG_INFO_PREFIX("cancel backup progress, backup_request = {}",
- boost::lexical_cast<std::string>(pair.second->request));
- }
-}
-
-void replica::clear_cold_backup_state() { _cold_backup_contexts.clear(); }
-} // namespace replication
-} // namespace dsn
diff --git a/src/replica/replica_config.cpp b/src/replica/replica_config.cpp
index 580d82b..9c5d550 100644
--- a/src/replica/replica_config.cpp
+++ b/src/replica/replica_config.cpp
@@ -809,8 +809,6 @@
// case 2: primary -> ps_inavtive & _inactive_is_transient = true -> secondary
// case 3: primary -> ps_inactive & _inactive_is_transient = ture
// case 4: ps_inactive & _inactive_is_transient = true -> primary or secondary
- // the way we process whether primary stop uploading backup checkpoint is that case-1 continue
- // uploading, others just stop uploading
switch (old_status) {
case partition_status::PS_PRIMARY:
cleanup_preparing_mutations(false);
@@ -820,24 +818,10 @@
break;
case partition_status::PS_INACTIVE:
_primary_states.cleanup(old_ballot != config.ballot);
- // here we use wheather ballot changes and wheather disconnecting with meta to
- // distinguish different case above mentioned
- if (old_ballot == config.ballot && _stub->is_connected()) {
- // case 1 and case 2, just continue uploading
- //(when case2, we stop uploading when it change to secondary)
- } else {
- set_backup_context_cancel();
- clear_cold_backup_state();
- }
break;
case partition_status::PS_SECONDARY:
case partition_status::PS_ERROR:
_primary_states.cleanup(true);
- // only load balance will occur primary -> secondary
- // and we just stop upload and release the cold_backup_state, and let new primary to
- // upload
- set_backup_context_cancel();
- clear_cold_backup_state();
break;
case partition_status::PS_POTENTIAL_SECONDARY:
CHECK(false, "invalid execution path");
@@ -848,16 +832,6 @@
break;
case partition_status::PS_SECONDARY:
cleanup_preparing_mutations(false);
- if (config.status != partition_status::PS_SECONDARY) {
- // if primary change the ballot, secondary will update ballot from A to
- // A+1, we don't need clear cold backup context when this case
- //
- // if secondary upgrade to primary, we must cancel & clear cold_backup_state, because
- // new-primary must check whether backup is already completed by previous-primary
-
- set_backup_context_cancel();
- clear_cold_backup_state();
- }
switch (config.status) {
case partition_status::PS_PRIMARY:
init_group_check();
@@ -931,11 +905,6 @@
}
break;
case partition_status::PS_INACTIVE:
- if (config.status != partition_status::PS_PRIMARY || !_inactive_is_transient) {
- // except for case 1, we need stop uploading backup checkpoint
- set_backup_context_cancel();
- clear_cold_backup_state();
- }
switch (config.status) {
case partition_status::PS_PRIMARY:
CHECK(_inactive_is_transient, "must be in transient state for being primary next");
diff --git a/src/replica/replica_init.cpp b/src/replica/replica_init.cpp
index 8779802..35b62ef 100644
--- a/src/replica/replica_init.cpp
+++ b/src/replica/replica_init.cpp
@@ -30,7 +30,6 @@
#include <memory>
#include <string>
-#include "backup/replica_backup_manager.h"
#include "common/gpid.h"
#include "common/replication.codes.h"
#include "common/replication_other_types.h"
@@ -241,8 +240,6 @@
std::chrono::seconds(FLAGS_checkpoint_interval_seconds),
get_gpid().thread_hash());
}
-
- _backup_mgr->start_collect_backup_info();
}
}
diff --git a/src/replica/replica_restore.cpp b/src/replica/replica_restore.cpp
index 063a294..cb9e644 100644
--- a/src/replica/replica_restore.cpp
+++ b/src/replica/replica_restore.cpp
@@ -26,7 +26,6 @@
#include <utility>
#include <vector>
-#include "backup/cold_backup_context.h"
#include "backup_types.h"
#include "block_service/block_service.h"
#include "block_service/block_service_manager.h"
diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp
index 4f6f73c..7796d98 100644
--- a/src/replica/replica_stub.cpp
+++ b/src/replica/replica_stub.cpp
@@ -42,7 +42,6 @@
#include <vector>
#include "absl/strings/string_view.h"
-#include "backup/replica_backup_server.h"
#include "bulk_load/replica_bulk_loader.h"
#include "common/backup_common.h"
#include "common/duplication_common.h"
@@ -677,8 +676,6 @@
_duplication_sync_timer->start();
}
- _backup_server = std::make_unique<replica_backup_server>(this);
-
// init liveness monitor
CHECK_EQ(NS_Disconnected, _state);
if (!FLAGS_fd_disabled) {
diff --git a/src/replica/replica_stub.h b/src/replica/replica_stub.h
index 0370c7d..7d4deb7 100644
--- a/src/replica/replica_stub.h
+++ b/src/replica/replica_stub.h
@@ -76,6 +76,7 @@
class command_deregister;
class message_ex;
class nfs_node;
+
namespace security {
class kms_key_provider;
} // namespace security
@@ -111,7 +112,6 @@
namespace test {
class test_checker;
}
-class cold_backup_context;
class replica_split_manager;
typedef std::unordered_map<gpid, replica_ptr> replicas;
@@ -124,7 +124,6 @@
typedef dsn::ref_ptr<replica_stub> replica_stub_ptr;
class duplication_sync_timer;
-class replica_backup_server;
// The replica_stub is the *singleton* entry to access all replica managed in the same process
// replica_stub(singleton) --> replica --> replication_app_base
@@ -419,7 +418,6 @@
friend class ::dsn::replication::test::test_checker;
friend class ::dsn::replication::replica;
friend class ::dsn::replication::potential_secondary_context;
- friend class ::dsn::replication::cold_backup_context;
friend class replica_duplicator;
friend class replica_http_service;
@@ -476,7 +474,6 @@
::dsn::task_ptr _mem_release_timer_task;
std::unique_ptr<duplication_sync_timer> _duplication_sync_timer;
- std::unique_ptr<replica_backup_server> _backup_server;
std::unique_ptr<dsn::security::kms_key_provider> _key_provider;
// command_handlers
diff --git a/src/replica/test/backup_block_service_mock.h b/src/replica/test/backup_block_service_mock.h
deleted file mode 100644
index 4ac8bc9..0000000
--- a/src/replica/test/backup_block_service_mock.h
+++ /dev/null
@@ -1,76 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <iostream>
-#include "utils/filesystem.h"
-#include "utils/error_code.h"
-#include "utils/threadpool_code.h"
-#include "runtime/task/task_code.h"
-#include "common/gpid.h"
-#include "runtime/task/task_tracker.h"
-#include "block_service/block_service.h"
-#include "replica/replica_context.h"
-#include "replication_service_test_app.h"
-#include "block_service/test/block_service_mock.h"
-#include "common/backup_common.h"
-
-using namespace ::dsn;
-using namespace ::dsn::dist::block_service;
-using namespace ::dsn::replication;
-
-extern ref_ptr<block_file_mock> current_chkpt_file;
-extern ref_ptr<block_file_mock> backup_metadata_file;
-extern ref_ptr<block_file_mock> regular_file;
-
-class backup_block_service_mock : public block_service_mock
-{
-public:
- virtual dsn::task_ptr create_file(const create_file_request &req,
- dsn::task_code code,
- const create_file_callback &cb,
- dsn::task_tracker *tracker = nullptr)
- {
- create_file_response resp;
- if (enable_create_file_fail) {
- resp.err = ERR_MOCK_INTERNAL;
- } else {
- resp.err = ERR_OK;
- auto it = files.find(req.file_name);
- if (it != files.end()) {
- resp.file_handle =
- new block_file_mock(req.file_name, it->second.first, it->second.second);
- } else {
- std::string filename = ::dsn::utils::filesystem::get_file_name(req.file_name);
- if (filename == cold_backup_constant::CURRENT_CHECKPOINT) {
- resp.file_handle = current_chkpt_file;
- std::cout << "current_ckpt_file is selected..." << std::endl;
- } else if (filename == cold_backup_constant::BACKUP_METADATA) {
- resp.file_handle = backup_metadata_file;
- std::cout << "backup_metadata_file is selected..." << std::endl;
- } else {
- resp.file_handle = regular_file;
- std::cout << "regular_file is selected..." << std::endl;
- }
- }
- }
-
- cb(resp);
- return task_ptr();
- }
-};
diff --git a/src/replica/test/cold_backup_context_test.cpp b/src/replica/test/cold_backup_context_test.cpp
deleted file mode 100644
index 529ac0a..0000000
--- a/src/replica/test/cold_backup_context_test.cpp
+++ /dev/null
@@ -1,456 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <atomic>
-#include <cstdint>
-#include <iostream>
-#include <map>
-#include <memory>
-#include <string>
-#include <type_traits>
-#include <utility>
-#include <vector>
-
-#include "backup_block_service_mock.h"
-#include "backup_types.h"
-#include "block_service/block_service.h"
-#include "block_service/test/block_service_mock.h"
-#include "common/backup_common.h"
-#include "common/gpid.h"
-#include "common/json_helper.h"
-#include "gtest/gtest.h"
-#include "metadata_types.h"
-#include "replica/backup/cold_backup_context.h"
-#include "replica/replica.h"
-#include "replica/test/replication_service_test_app.h"
-#include "utils/autoref_ptr.h"
-#include "utils/blob.h"
-#include "utils/filesystem.h"
-
-ref_ptr<block_file_mock> current_chkpt_file = new block_file_mock("", 0, "");
-ref_ptr<block_file_mock> backup_metadata_file = new block_file_mock("", 0, "");
-ref_ptr<block_file_mock> regular_file = new block_file_mock("", 0, "");
-
-static std::string backup_root = "root";
-static backup_request request;
-static int32_t concurrent_uploading_file_cnt = 1;
-std::shared_ptr<backup_block_service_mock> block_service =
- std::make_shared<backup_block_service_mock>();
-
-void replication_service_test_app::check_backup_on_remote_test()
-{
- cold_backup_context_ptr backup_context =
- new cold_backup_context(nullptr, request, concurrent_uploading_file_cnt);
-
- backup_context->start_check();
- backup_context->block_service = block_service.get();
- backup_context->backup_root = backup_root;
- // case1 : current_chkpt_file don't exist
- {
- std::cout << "testing current_chkpt_file don't exist..." << std::endl;
- backup_context->check_backup_on_remote();
- ASSERT_TRUE(backup_context->status() == cold_backup_status::ColdBackupChecked);
- }
- backup_context->_status.store(cold_backup_status::ColdBackupChecking);
- // case2 : create current_chkpt_file fail
- {
- std::cout << "testing create current_chkpt_file fail..." << std::endl;
- block_service->enable_create_file_fail = true;
- std::cout << "ref_counter = " << backup_context->get_count() << std::endl;
- backup_context->check_backup_on_remote();
- ASSERT_TRUE(backup_context->status() == cold_backup_status::ColdBackupFailed);
- block_service->enable_create_file_fail = false;
- std::cout << "ref_counter = " << backup_context->get_count() << std::endl;
- }
- // case3 : current_chkpt_file exist
- // this case will call read_current_chkpt_file, so we make read_current_chkpt_file fail to stop
- backup_context->_status.store(cold_backup_status::ColdBackupChecking);
- {
- std::cout << "testing read current_chkpt_file fail..." << std::endl;
- current_chkpt_file->enable_read_fail = true;
- // so current_chkpt_file must exit
- current_chkpt_file->file_exist("123", 123);
- backup_context->check_backup_on_remote();
- ASSERT_TRUE(backup_context->status() == cold_backup_status::ColdBackupFailed);
- current_chkpt_file->enable_read_fail = false;
- current_chkpt_file->clear_file_exist();
- }
- ASSERT_TRUE(backup_context->get_count() == 1);
- ASSERT_TRUE(current_chkpt_file->get_count() == 1);
- ASSERT_TRUE(backup_metadata_file->get_count() == 1);
- ASSERT_TRUE(regular_file->get_count() == 1);
-}
-
-void replication_service_test_app::read_current_chkpt_file_test()
-{
- cold_backup_context_ptr backup_context =
- new cold_backup_context(nullptr, request, concurrent_uploading_file_cnt);
- backup_context->_status.store(cold_backup_status::ColdBackupChecking);
- backup_context->block_service = block_service.get();
- backup_context->backup_root = backup_root;
- // read current_chkpt_file fail has been already tested in check_backup_on_remote_test()
- // case1: current_chkpt_file is not exist
- {
- std::cout << "testing read_current_chkpt_file(file not exist)..." << std::endl;
- current_chkpt_file->clear_file_exist();
- block_file_ptr file_handle = current_chkpt_file.get();
- backup_context->read_current_chkpt_file(file_handle);
- ASSERT_TRUE(backup_context->status() == cold_backup_status::ColdBackupChecked);
- }
- backup_context->_status.store(cold_backup_status::ColdBackupChecking);
- // case2: current_chkpt_file exist
- // this case will call remote_chkpt_dir_exist(), so we make list_dir fail to stop
- {
- std::cout
- << "testing read_current_chkpt_file(file exist and check whether chkpt_dir is exist)..."
- << std::endl;
- current_chkpt_file->file_exist("123", 10);
- current_chkpt_file->set_context("test_dir");
- block_service->enable_list_dir_fail = true;
- block_file_ptr file_handle = current_chkpt_file.get();
- backup_context->read_current_chkpt_file(file_handle);
- ASSERT_TRUE(backup_context->status() == cold_backup_status::ColdBackupFailed);
- current_chkpt_file->clear_file_exist();
- current_chkpt_file->clear_context();
- block_service->enable_list_dir_fail = false;
- }
- ASSERT_TRUE(backup_context->get_count() == 1);
- ASSERT_TRUE(current_chkpt_file->get_count() == 1);
- ASSERT_TRUE(backup_metadata_file->get_count() == 1);
- ASSERT_TRUE(regular_file->get_count() == 1);
-}
-
-void replication_service_test_app::remote_chkpt_dir_exist_test()
-{
- gpid mock_gpid(1, 2);
- std::string mock_app_name("mock_app");
- int64_t mock_backup_id(1000);
- std::string mock_backup_provider_name("mock_backup_provider_name");
- request.__set_pid(mock_gpid);
- request.__set_app_name(mock_app_name);
- request.__set_backup_id(mock_backup_id);
- policy_info mock_policy_info;
- mock_policy_info.__set_backup_provider_type("mock_service");
- mock_policy_info.__set_policy_name("mock_policy");
- request.__set_policy(mock_policy_info);
- // the case that list_dir fail has been already tested
- cold_backup_context_ptr backup_context =
- new cold_backup_context(nullptr, request, concurrent_uploading_file_cnt);
-
- backup_context->start_check();
- backup_context->block_service = block_service.get();
- backup_context->backup_root = backup_root;
- backup_context->_status.store(cold_backup_status::ColdBackupChecking);
-
- // case1: directory is exist
- {
- std::cout << "testing remote checkpoint directory is exist..." << std::endl;
- std::string dir_name = std::string("test_dir");
- current_chkpt_file->file_exist("123", 10);
- current_chkpt_file->set_context(dir_name);
-
- std::string parent_dir = cold_backup::get_replica_backup_path(
- backup_root, mock_app_name, mock_gpid, mock_backup_id);
-
- std::vector<ls_entry> entries;
- entries.emplace_back(ls_entry{std::string(dir_name), true});
- // remote_chkpt_dir_exist() function judge whether the dir-A is exist through listing
- // the dir-A's parent path
- block_service->dir_files.insert(
- std::make_pair(::dsn::utils::filesystem::get_file_name(parent_dir), entries));
- backup_context->remote_chkpt_dir_exist(dir_name);
- ASSERT_TRUE(backup_context->status() == cold_backup_status::ColdBackupCompleted);
- current_chkpt_file->clear_file_exist();
- current_chkpt_file->clear_context();
- block_service->dir_files.clear();
- }
- backup_context->_status.store(cold_backup_status::ColdBackupChecking);
- // case2: directory is not exist
- {
- std::cout << "testing remote checkpoint directory is not exist..." << std::endl;
- std::string dir_name = std::string("test_dir");
- current_chkpt_file->file_exist("123", 10);
- current_chkpt_file->set_context(dir_name);
- backup_context->remote_chkpt_dir_exist(dir_name);
- ASSERT_TRUE(backup_context->status() == cold_backup_status::ColdBackupChecked);
- current_chkpt_file->clear_file_exist();
- current_chkpt_file->clear_context();
- }
- ASSERT_TRUE(backup_context->get_count() == 1);
- ASSERT_TRUE(current_chkpt_file->get_count() == 1);
- ASSERT_TRUE(backup_metadata_file->get_count() == 1);
- ASSERT_TRUE(regular_file->get_count() == 1);
-}
-
-void replication_service_test_app::upload_checkpoint_to_remote_test()
-{
- cold_backup_context_ptr backup_context =
- new cold_backup_context(nullptr, request, concurrent_uploading_file_cnt);
-
- backup_context->start_check();
- backup_context->block_service = block_service.get();
- backup_context->backup_root = backup_root;
- backup_context->_status.store(cold_backup_status::ColdBackupChecking);
- backup_context->_have_check_upload_status.store(false);
- backup_context->_upload_status.store(cold_backup_context::upload_status::UploadInvalid);
- backup_context->_status.store(cold_backup_status::ColdBackupUploading);
- // case1: create metadata_file fail
- {
- std::cout << "testing upload_checkpoint_to_remote, stop with create metadata fail..."
- << std::endl;
- block_service->enable_create_file_fail = true;
- backup_context->upload_checkpoint_to_remote();
- ASSERT_TRUE(backup_context->status() == cold_backup_status::ColdBackupFailed);
- block_service->enable_create_file_fail = false;
- }
- backup_context->_status.store(cold_backup_status::ColdBackupUploading);
- // case2: create metadata succeed, and metadata is exist
- // this case will stop when call read_backup_metadata with reason read file fail
- {
- std::cout << "testing upload_checkpoint_to_remote, stop with read metadata file fail..."
- << std::endl;
- backup_context->_have_check_upload_status.store(false);
- backup_context->_upload_status.store(cold_backup_context::upload_status::UploadInvalid);
- std::string md5 = "test_md5";
- int64_t size = 10;
- backup_metadata_file->enable_read_fail = true;
- backup_metadata_file->file_exist(md5, size);
- backup_context->upload_checkpoint_to_remote();
- ASSERT_TRUE(backup_context->status() == cold_backup_status::ColdBackupFailed);
- backup_metadata_file->clear_file_exist();
- backup_metadata_file->enable_read_fail = false;
- }
- backup_context->_status.store(cold_backup_status::ColdBackupUploading);
- // case3: create metadata succeed, but metadata is not exist
- // this case will stop after call write_metadata_file with write fail
- {
- std::cout << "testing upload_chekpoint_to_remote, stop with create metadata file fail..."
- << std::endl;
- backup_context->_have_check_upload_status.store(false);
- backup_context->_upload_status.store(cold_backup_context::upload_status::UploadInvalid);
- backup_metadata_file->enable_write_fail = true;
- backup_context->upload_checkpoint_to_remote();
- ASSERT_TRUE(backup_context->status() == cold_backup_status::ColdBackupFailed);
- backup_metadata_file->enable_write_fail = false;
- }
- ASSERT_TRUE(backup_context->get_count() == 1);
- ASSERT_TRUE(current_chkpt_file->get_count() == 1);
- ASSERT_TRUE(backup_metadata_file->get_count() == 1);
- ASSERT_TRUE(regular_file->get_count() == 1);
-}
-
-void replication_service_test_app::read_backup_metadata_test()
-{
- cold_backup_context_ptr backup_context =
- new cold_backup_context(nullptr, request, concurrent_uploading_file_cnt);
-
- backup_context->start_check();
- backup_context->block_service = block_service.get();
- backup_context->backup_root = backup_root;
-
- backup_context->_status.store(cold_backup_status::ColdBackupUploading);
- // case1: metadata is valid
- // stop with create current_chkpt_file fail
- {
- std::cout << "testing read_backup_metadata_file, with context of metadata is valid..."
- << std::endl;
- blob buf = ::json::json_forwarder<cold_backup_metadata>::encode(backup_context->_metadata);
- std::string context(buf.data(), buf.length());
- backup_metadata_file->set_context(context);
- backup_metadata_file->file_exist("test_md5", 10);
- block_service->enable_create_file_fail = true;
- ref_ptr<block_file> file_handle = backup_metadata_file.get();
- backup_context->read_backup_metadata(file_handle);
- ASSERT_TRUE(backup_context->status() == cold_backup_status::ColdBackupFailed);
- block_service->enable_create_file_fail = false;
- backup_metadata_file->clear_context();
- backup_metadata_file->clear_file_exist();
- }
- backup_context->_status.store(cold_backup_status::ColdBackupUploading);
- // case2: metadata is invalid
- // stop with create current_chkpt_file fail
- {
- std::cout << "testing read_backup_metada_file, with context of metadata is invalid..."
- << std::endl;
- backup_metadata_file->file_exist("test_md5", 10);
- backup_metadata_file->set_context("{\"key\":value\"");
- block_service->enable_create_file_fail = true;
- ref_ptr<block_file> file_handle = backup_metadata_file.get();
- backup_context->read_backup_metadata(file_handle);
- ASSERT_TRUE(backup_context->status() == cold_backup_status::ColdBackupFailed);
- block_service->enable_create_file_fail = false;
- backup_metadata_file->clear_file_exist();
- backup_metadata_file->clear_context();
- }
- // case3: read metadata fail
- // this case has been already tested before, here just ignore
- ASSERT_TRUE(backup_context->get_count() == 1);
- ASSERT_TRUE(current_chkpt_file->get_count() == 1);
- ASSERT_TRUE(backup_metadata_file->get_count() == 1);
- ASSERT_TRUE(regular_file->get_count() == 1);
-}
-
-void replication_service_test_app::on_upload_chkpt_dir_test()
-{
- cold_backup_context_ptr backup_context =
- new cold_backup_context(nullptr, request, concurrent_uploading_file_cnt);
-
- backup_context->start_check();
- backup_context->block_service = block_service.get();
- backup_context->backup_root = backup_root;
- backup_context->_status.store(cold_backup_status::ColdBackupUploading);
-
- backup_context->_upload_status.store(cold_backup_context::upload_status::UploadUncomplete);
- backup_context->_max_concurrent_uploading_file_cnt = 2;
- // case1: empty checkpoint file has been already tested in read_backup_metadata
- // so, here just ignore
-
- // case2: checkpoint file is not empty
- {
- std::cout << "testing on_upload_chkpt_dir with non-empty checkpoint files..." << std::endl;
- // smiulate some files, because file is not truly exist, so we must ignore testing
- // prepare_upload
- // TODO: find a better way to test prepare_upload
- std::string test_file1 = "test_file1";
- int32_t file1_size = 10;
- std::string file1_md5 = "test_file1_md5";
-
- std::string test_file2 = "test_file2";
- int32_t file2_size = 10;
- std::string file2_md5 = "test_file2_md5";
-
- backup_context->checkpoint_files.emplace_back(test_file1);
- backup_context->checkpoint_files.emplace_back(test_file2);
-
- {
- // should smiulate prepare_upload here
- backup_context->_file_remain_cnt = 2;
-
- file_meta f_meta;
- f_meta.name = test_file1;
- f_meta.md5 = file1_md5;
- f_meta.size = file1_size;
- backup_context->_file_status.insert(
- std::make_pair(test_file1, cold_backup_context::file_status::FileUploadUncomplete));
- backup_context->_file_infos.insert(
- std::make_pair(test_file1, std::make_pair(file1_size, file1_md5)));
- backup_context->checkpoint_file_total_size = file1_size;
- backup_context->_metadata.files.emplace_back(f_meta);
-
- f_meta.name = test_file2;
- f_meta.md5 = file2_md5;
- f_meta.size = file2_size;
- backup_context->_file_status.insert(
- std::make_pair(test_file2, cold_backup_context::file_status::FileUploadUncomplete));
- backup_context->_file_infos.insert(
- std::make_pair(test_file2, std::make_pair(file2_size, file2_md5)));
- backup_context->checkpoint_file_total_size += file2_size;
- backup_context->_metadata.files.emplace_back(f_meta);
- }
-
- backup_metadata_file->enable_write_fail = true;
- regular_file->size = 10;
- backup_context->on_upload_chkpt_dir();
- std::cout << cold_backup_status_to_string(backup_context->status()) << std::endl;
- ASSERT_TRUE(backup_context->status() == cold_backup_status::ColdBackupFailed);
- regular_file->clear_file_exist();
- backup_metadata_file->enable_write_fail = false;
- backup_context->_metadata.files.clear();
- backup_context->checkpoint_files.clear();
- }
- ASSERT_TRUE(backup_context->get_count() == 1);
- ASSERT_TRUE(current_chkpt_file->get_count() == 1);
- ASSERT_TRUE(backup_metadata_file->get_count() == 1);
- ASSERT_TRUE(regular_file->get_count() == 1);
-}
-
-void replication_service_test_app::write_backup_metadata_test()
-{
- cold_backup_context_ptr backup_context =
- new cold_backup_context(nullptr, request, concurrent_uploading_file_cnt);
-
- backup_context->start_check();
- backup_context->block_service = block_service.get();
- backup_context->backup_root = backup_root;
- backup_context->_status.store(cold_backup_status::ColdBackupUploading);
- // case1: create backup_metadata file fail
- // this case has been already tested
-
- // case2: create backup_metadata file succeed, but write file fail
- // this case has been already tested
-
- // case3: create backup_metadata file succeed, and write file succeed
- {
- std::cout << "create backup_metadata_file succeed, and write file succeed..." << std::endl;
- std::string test_file1 = "test_file1";
- std::string test_file2 = "test_file2";
- backup_context->_metadata.checkpoint_decree = 100;
-
- file_meta f_meta;
- f_meta.name = test_file1;
- f_meta.md5 = "test_file1_md5";
- f_meta.size = 10;
- backup_context->_metadata.files.emplace_back(f_meta);
- f_meta.name = test_file2;
- f_meta.md5 = "test_file2_md5";
- f_meta.size = 11;
- backup_context->_metadata.files.emplace_back(f_meta);
-
- blob result =
- ::json::json_forwarder<cold_backup_metadata>::encode(backup_context->_metadata);
- std::string value(result.data(), result.length());
- current_chkpt_file->enable_write_fail = true;
- backup_context->write_backup_metadata();
- std::string value_write(backup_metadata_file->context.data(),
- backup_metadata_file->context.length());
- ASSERT_TRUE(result.data() != backup_metadata_file->context.data());
- ASSERT_TRUE(value == value_write);
- ASSERT_TRUE(backup_context->status() == cold_backup_status::ColdBackupFailed);
- current_chkpt_file->enable_write_fail = false;
- backup_context->_metadata.files.clear();
- }
- ASSERT_TRUE(backup_context->get_count() == 1);
- ASSERT_TRUE(current_chkpt_file->get_count() == 1);
- ASSERT_TRUE(backup_metadata_file->get_count() == 1);
- ASSERT_TRUE(regular_file->get_count() == 1);
-}
-
-void replication_service_test_app::write_current_chkpt_file_test()
-{
- cold_backup_context_ptr backup_context =
- new cold_backup_context(nullptr, request, concurrent_uploading_file_cnt);
-
- backup_context->start_check();
- backup_context->block_service = block_service.get();
- backup_context->backup_root = backup_root;
- backup_context->_status.store(cold_backup_status::ColdBackupUploading);
- // case1: create current_chkpt_file succeed, and write succeed
- {
- std::string value = "test write_current_chkpt_file";
- backup_context->write_current_chkpt_file(value);
-
- std::string result(current_chkpt_file->context.data(),
- current_chkpt_file->context.length());
- ASSERT_TRUE(value == result);
- ASSERT_TRUE(backup_context->status() == cold_backup_status::ColdBackupCompleted);
- ASSERT_TRUE(backup_context->_progress.load() >= 1000);
- }
- ASSERT_TRUE(backup_context->get_count() == 1);
- ASSERT_TRUE(current_chkpt_file->get_count() == 1);
- ASSERT_TRUE(backup_metadata_file->get_count() == 1);
- ASSERT_TRUE(regular_file->get_count() == 1);
-}
diff --git a/src/replica/test/main.cpp b/src/replica/test/main.cpp
index aebab6b..0d16f30 100644
--- a/src/replica/test/main.cpp
+++ b/src/replica/test/main.cpp
@@ -25,38 +25,12 @@
#include "replication_service_test_app.h"
#include "runtime/app_model.h"
#include "runtime/service_app.h"
-#include "test_util/test_util.h"
#include "utils/error_code.h"
int gtest_flags = 0;
int gtest_ret = 0;
replication_service_test_app *app;
-class cold_backup_context_test : public pegasus::encrypt_data_test_base
-{
-};
-
-INSTANTIATE_TEST_SUITE_P(, cold_backup_context_test, ::testing::Values(false, true));
-
-TEST_P(cold_backup_context_test, check_backup_on_remote) { app->check_backup_on_remote_test(); }
-
-TEST_P(cold_backup_context_test, read_current_chkpt_file) { app->read_current_chkpt_file_test(); }
-
-TEST_P(cold_backup_context_test, remote_chkpt_dir_exist) { app->remote_chkpt_dir_exist_test(); }
-
-TEST_P(cold_backup_context_test, upload_checkpoint_to_remote)
-{
- app->upload_checkpoint_to_remote_test();
-}
-
-TEST_P(cold_backup_context_test, read_backup_metadata) { app->read_backup_metadata_test(); }
-
-TEST_P(cold_backup_context_test, on_upload_chkpt_dir) { app->on_upload_chkpt_dir_test(); }
-
-TEST_P(cold_backup_context_test, write_metadata_file) { app->write_backup_metadata_test(); }
-
-TEST_P(cold_backup_context_test, write_current_chkpt_file) { app->write_current_chkpt_file_test(); }
-
error_code replication_service_test_app::start(const std::vector<std::string> &args)
{
app = this;
diff --git a/src/replica/test/mock_utils.h b/src/replica/test/mock_utils.h
index 6d7725b..e007008 100644
--- a/src/replica/test/mock_utils.h
+++ b/src/replica/test/mock_utils.h
@@ -33,8 +33,6 @@
#include "replica/replica.h"
#include "replica/replica_stub.h"
-#include "replica/backup/cold_backup_context.h"
-#include "runtime/rpc/rpc_host_port.h"
DSN_DECLARE_int32(log_private_file_size_mb);
@@ -203,19 +201,6 @@
_primary_states.secondary_bulk_load_states.size() == 0);
}
- // mock cold backup related function.
- void generate_backup_checkpoint(cold_backup_context_ptr backup_context) override
- {
- if (backup_context->status() != ColdBackupCheckpointing) {
- LOG_INFO("{}: ignore generating backup checkpoint because backup_status = {}",
- backup_context->name,
- cold_backup_status_to_string(backup_context->status()));
- backup_context->ignore_checkpoint();
- return;
- }
- backup_context->complete_checkpoint();
- }
-
void update_last_durable_decree(decree decree)
{
dynamic_cast<mock_replication_app_base *>(_app.get())->set_last_durable_decree(decree);
diff --git a/src/replica/test/replica_test.cpp b/src/replica/test/replica_test.cpp
index 7123dd8..fc1107e 100644
--- a/src/replica/test/replica_test.cpp
+++ b/src/replica/test/replica_test.cpp
@@ -27,8 +27,6 @@
#include <utility>
#include <vector>
-#include "backup_types.h"
-#include "common/backup_common.h"
#include "common/fs_manager.h"
#include "common/gpid.h"
#include "common/replica_envs.h"
@@ -50,7 +48,6 @@
#include "replica/replication_app_base.h"
#include "replica/test/mock_utils.h"
#include "replica_test_base.h"
-#include "runtime/api_layer1.h"
#include "runtime/rpc/network.sim.h"
#include "runtime/rpc/rpc_address.h"
#include "runtime/rpc/rpc_message.h"
@@ -58,7 +55,6 @@
#include "runtime/task/task_tracker.h"
#include "utils/autoref_ptr.h"
#include "utils/defer.h"
-#include "utils/env.h"
#include "utils/error_code.h"
#include "utils/filesystem.h"
#include "utils/flags.h"
@@ -76,25 +72,15 @@
class replica_test : public replica_test_base
{
public:
- replica_test()
- : _pid(gpid(2, 1)),
- _backup_id(dsn_now_ms()),
- _provider_name("local_service"),
- _policy_name("mock_policy")
- {
- }
+ replica_test() : _pid(gpid(2, 1)) {}
void SetUp() override
{
FLAGS_enable_http_server = false;
mock_app_info();
+
_mock_replica =
stub->generate_replica_ptr(_app_info, _pid, partition_status::PS_PRIMARY, 1);
-
- // set FLAGS_cold_backup_root manually.
- // FLAGS_cold_backup_root is set by configuration "replication.cold_backup_root",
- // which is usually the cluster_name of production clusters.
- FLAGS_cold_backup_root = "test_cluster";
}
int64_t get_backup_request_count() const { return _mock_replica->get_backup_request_count(); }
@@ -148,55 +134,6 @@
_app_info.partition_count = 8;
}
- void test_on_cold_backup(const std::string user_specified_path = "")
- {
- backup_request req;
- req.pid = _pid;
- policy_info backup_policy_info;
- backup_policy_info.__set_backup_provider_type(_provider_name);
- backup_policy_info.__set_policy_name(_policy_name);
- req.policy = backup_policy_info;
- req.app_name = _app_info.app_name;
- req.backup_id = _backup_id;
- if (!user_specified_path.empty()) {
- req.__set_backup_path(user_specified_path);
- }
-
- // test cold backup could complete.
- backup_response resp;
- do {
- _mock_replica->on_cold_backup(req, resp);
- } while (resp.err == ERR_BUSY);
- ASSERT_EQ(ERR_OK, resp.err);
-
- // test checkpoint files have been uploaded successfully.
- std::string backup_root =
- dsn::utils::filesystem::path_combine(user_specified_path, FLAGS_cold_backup_root);
- std::string current_chkpt_file =
- cold_backup::get_current_chkpt_file(backup_root, req.app_name, req.pid, req.backup_id);
- ASSERT_TRUE(dsn::utils::filesystem::file_exists(current_chkpt_file));
- int64_t size = 0;
- dsn::utils::filesystem::file_size(
- current_chkpt_file, dsn::utils::FileDataType::kSensitive, size);
- ASSERT_LT(0, size);
- }
-
- error_code test_find_valid_checkpoint(const std::string user_specified_path = "")
- {
- configuration_restore_request req;
- req.app_id = _app_info.app_id;
- req.app_name = _app_info.app_name;
- req.backup_provider_name = _provider_name;
- req.cluster_name = FLAGS_cold_backup_root;
- req.time_stamp = _backup_id;
- if (!user_specified_path.empty()) {
- req.__set_restore_path(user_specified_path);
- }
-
- std::string remote_chkpt_dir;
- return _mock_replica->find_valid_checkpoint(req, remote_chkpt_dir);
- }
-
void force_update_checkpointing(bool running)
{
_mock_replica->_is_manual_emergency_checkpointing = running;
@@ -254,11 +191,6 @@
dsn::app_info _app_info;
dsn::gpid _pid;
mock_replica_ptr _mock_replica;
-
-private:
- const int64_t _backup_id;
- const std::string _provider_name;
- const std::string _policy_name;
};
INSTANTIATE_TEST_SUITE_P(, replica_test, ::testing::Values(false, true));
@@ -405,26 +337,7 @@
}
}
-TEST_P(replica_test, test_replica_backup_and_restore)
-{
- // TODO(yingchun): this test last too long time, optimize it!
- return;
- test_on_cold_backup();
- auto err = test_find_valid_checkpoint();
- ASSERT_EQ(ERR_OK, err);
-}
-
-TEST_P(replica_test, test_replica_backup_and_restore_with_specific_path)
-{
- // TODO(yingchun): this test last too long time, optimize it!
- return;
- std::string user_specified_path = "test/backup";
- test_on_cold_backup(user_specified_path);
- auto err = test_find_valid_checkpoint(user_specified_path);
- ASSERT_EQ(ERR_OK, err);
-}
-
-TEST_P(replica_test, test_trigger_manual_emergency_checkpoint)
+TEST_F(replica_test, test_trigger_manual_emergency_checkpoint)
{
ASSERT_EQ(_mock_replica->trigger_manual_emergency_checkpoint(100), ERR_OK);
ASSERT_TRUE(is_checkpointing());
diff --git a/src/replica/test/replica_test_base.h b/src/replica/test/replica_test_base.h
index 9296e2a..d034e6a 100644
--- a/src/replica/test/replica_test_base.h
+++ b/src/replica/test/replica_test_base.h
@@ -72,8 +72,8 @@
mu->data.header.timestamp = decree;
mu->data.updates.emplace_back(mutation_update());
- mu->data.updates.back().code =
- RPC_COLD_BACKUP; // whatever code it is, but never be WRITE_EMPTY
+ // mu->data.updates.back().code =
+ // RPC_COLD_BACKUP; // whatever code it is, but never be WRITE_EMPTY
mu->data.updates.back().data = blob::create_from_bytes(std::string(data));
mu->client_requests.push_back(nullptr);
diff --git a/src/replica/test/replication_service_test_app.h b/src/replica/test/replication_service_test_app.h
index 9d3edc3..8a7b7ee 100644
--- a/src/replica/test/replication_service_test_app.h
+++ b/src/replica/test/replication_service_test_app.h
@@ -38,15 +38,4 @@
}
virtual error_code start(const std::vector<std::string> &args) override;
virtual dsn::error_code stop(bool /*cleanup*/) { return dsn::ERR_OK; }
-
- // test for cold_backup_context
- void check_backup_on_remote_test();
- void read_current_chkpt_file_test();
- void remote_chkpt_dir_exist_test();
-
- void upload_checkpoint_to_remote_test();
- void read_backup_metadata_test();
- void on_upload_chkpt_dir_test();
- void write_backup_metadata_test();
- void write_current_chkpt_file_test();
};
diff --git a/src/test/function_test/backup_restore/test_backup_and_restore.cpp b/src/test/function_test/backup_restore/test_backup_and_restore.cpp
index 3eab10c..eba0e0d 100644
--- a/src/test/function_test/backup_restore/test_backup_and_restore.cpp
+++ b/src/test/function_test/backup_restore/test_backup_and_restore.cpp
@@ -90,9 +90,13 @@
const std::string backup_restore_test::s_new_app_name = "new_app";
const std::string backup_restore_test::s_provider_type = "local_service";
-TEST_F(backup_restore_test, test_backup_and_restore) { test_backup_and_restore(); }
+// TODO(heyuchen): implement it
+TEST_F(backup_restore_test, test_backup_and_restore)
+{
+ // test_backup_and_restore();
+}
TEST_F(backup_restore_test, test_backup_and_restore_with_user_specified_path)
{
- test_backup_and_restore("test/path");
+ // test_backup_and_restore("test/path");
}
diff --git a/src/test/function_test/restore/test_restore.cpp b/src/test/function_test/restore/test_restore.cpp
index 42e1316..5a3e375 100644
--- a/src/test/function_test/restore/test_restore.cpp
+++ b/src/test/function_test/restore/test_restore.cpp
@@ -60,9 +60,11 @@
NO_FATALS(write_data(kTestCount));
std::vector<int32_t> app_ids({table_id_});
- ASSERT_EQ(
- ERR_OK,
- ddl_client_->add_backup_policy("policy_1", "local_service", app_ids, 86400, 6, "24:0"));
+ // gns: removed backup policy
+ // ASSERT_EQ(
+ // ERR_OK,
+ // ddl_client_->add_backup_policy("policy_1", "local_service", app_ids, 86400, 6,
+ // "24:0"));
}
void TearDown() override { ASSERT_EQ(ERR_OK, ddl_client_->drop_app(table_name_, 0)); }