| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "cold_backup_context.h" |
| |
| #include <chrono> |
| #include <cstdint> |
| #include <memory> |
| // IWYU pragma: no_include <type_traits> |
| |
| #include "common/backup_common.h" |
| #include "common/replication.codes.h" |
| #include "replica/replica.h" |
| #include "runtime/api_layer1.h" |
| #include "runtime/task/async_calls.h" |
| #include "utils/blob.h" |
| #include "utils/error_code.h" |
| #include "utils/filesystem.h" |
| #include "utils/metrics.h" |
| #include "utils/utils.h" |
| |
| namespace dsn { |
| namespace replication { |
| |
| const char *cold_backup_status_to_string(cold_backup_status status) |
| { |
| switch (status) { |
| case ColdBackupInvalid: |
| return "ColdBackupInvalid"; |
| case ColdBackupChecking: |
| return "ColdBackupChecking"; |
| case ColdBackupChecked: |
| return "ColdBackupChecked"; |
| case ColdBackupCheckpointing: |
| return "ColdBackupCheckpointing"; |
| case ColdBackupCheckpointed: |
| return "ColdBackupCheckpointed"; |
| case ColdBackupUploading: |
| return "ColdBackupUploading"; |
| case ColdBackupPaused: |
| return "ColdBackupPaused"; |
| case ColdBackupCanceled: |
| return "ColdBackupCanceled"; |
| case ColdBackupCompleted: |
| return "ColdBackupCompleted"; |
| case ColdBackupFailed: |
| return "ColdBackupFailed"; |
| default: |
| CHECK(false, ""); |
| } |
| return "ColdBackupXXX"; |
| } |
| |
| void cold_backup_context::cancel() |
| { |
| _status.store(ColdBackupCanceled); |
| if (_owner_replica != nullptr) { |
| METRIC_INCREMENT(*_owner_replica, backup_cancelled_count); |
| } |
| } |
| |
| bool cold_backup_context::start_check() |
| { |
| int invalid = ColdBackupInvalid; |
| if (_status.compare_exchange_strong(invalid, ColdBackupChecking)) { |
| _start_time_ms = dsn_now_ms(); |
| return true; |
| } else { |
| return false; |
| } |
| } |
| |
| bool cold_backup_context::fail_check(const char *failure_reason) |
| { |
| int checking = ColdBackupChecking; |
| if (_status.compare_exchange_strong(checking, ColdBackupFailed)) { |
| strncpy(_reason, failure_reason, sizeof(_reason) - 1); |
| _reason[sizeof(_reason) - 1] = '\0'; |
| if (_owner_replica != nullptr) { |
| METRIC_INCREMENT(*_owner_replica, backup_failed_count); |
| } |
| return true; |
| } else { |
| return false; |
| } |
| } |
| |
| bool cold_backup_context::complete_check(bool uploaded) |
| { |
| int checking = ColdBackupChecking; |
| if (uploaded) { |
| _progress.store(cold_backup_constant::PROGRESS_FINISHED); |
| if (_owner_replica != nullptr) { |
| METRIC_INCREMENT(*_owner_replica, backup_successful_count); |
| } |
| return _status.compare_exchange_strong(checking, ColdBackupCompleted); |
| } else { |
| return _status.compare_exchange_strong(checking, ColdBackupChecked); |
| } |
| } |
| |
| bool cold_backup_context::start_checkpoint() |
| { |
| int checked = ColdBackupChecked; |
| if (_status.compare_exchange_strong(checked, ColdBackupCheckpointing)) { |
| return true; |
| } else { |
| return false; |
| } |
| } |
| |
| bool cold_backup_context::fail_checkpoint(const char *failure_reason) |
| { |
| int checkpointing = ColdBackupCheckpointing; |
| if (_status.compare_exchange_strong(checkpointing, ColdBackupFailed)) { |
| strncpy(_reason, failure_reason, sizeof(_reason) - 1); |
| _reason[sizeof(_reason) - 1] = '\0'; |
| if (_owner_replica != nullptr) { |
| METRIC_INCREMENT(*_owner_replica, backup_failed_count); |
| } |
| return true; |
| } else { |
| return false; |
| } |
| } |
| |
| bool cold_backup_context::complete_checkpoint() |
| { |
| int checkpointing = ColdBackupCheckpointing; |
| if (_status.compare_exchange_strong(checkpointing, ColdBackupCheckpointed)) { |
| return true; |
| } else { |
| return false; |
| } |
| } |
| bool cold_backup_context::fail_upload(const char *failure_reason) |
| { |
| int uploading = ColdBackupUploading; |
| int paused = ColdBackupPaused; |
| if (_status.compare_exchange_strong(uploading, ColdBackupFailed) || |
| _status.compare_exchange_strong(paused, ColdBackupFailed)) { |
| strncpy(_reason, failure_reason, sizeof(_reason) - 1); |
| _reason[sizeof(_reason) - 1] = '\0'; |
| if (_owner_replica != nullptr) { |
| METRIC_INCREMENT(*_owner_replica, backup_failed_count); |
| } |
| return true; |
| } else { |
| return false; |
| } |
| } |
| |
| bool cold_backup_context::complete_upload() |
| { |
| int uploading = ColdBackupUploading; |
| int paused = ColdBackupPaused; |
| if (_status.compare_exchange_strong(uploading, ColdBackupCompleted) || |
| _status.compare_exchange_strong(paused, ColdBackupCompleted)) { |
| _progress.store(cold_backup_constant::PROGRESS_FINISHED); |
| if (_owner_replica != nullptr) { |
| METRIC_INCREMENT(*_owner_replica, backup_successful_count); |
| } |
| return true; |
| } else { |
| return false; |
| } |
| } |
| |
| // run in REPLICATION_LONG thread |
| void cold_backup_context::check_backup_on_remote() |
| { |
| // check whether current checkpoint file is exist on remote, and verify whether the checkpoint |
| // directory is exist |
| std::string current_chkpt_file = cold_backup::get_current_chkpt_file( |
| backup_root, request.app_name, request.pid, request.backup_id); |
| dist::block_service::create_file_request req; |
| req.file_name = current_chkpt_file; |
| req.ignore_metadata = false; |
| |
| // incr the ref counter, and must release_ref() after callback is execute |
| add_ref(); |
| |
| block_service->create_file( |
| std::move(req), |
| LPC_BACKGROUND_COLD_BACKUP, |
| [this, current_chkpt_file](const dist::block_service::create_file_response &resp) { |
| if (!is_ready_for_check()) { |
| LOG_INFO("{}: backup status has changed to {}, ignore checking backup on remote", |
| name, |
| cold_backup_status_to_string(status())); |
| ignore_check(); |
| } else if (resp.err == ERR_OK) { |
| const dist::block_service::block_file_ptr &file_handle = resp.file_handle; |
| CHECK_NOTNULL(file_handle, ""); |
| if (file_handle->get_md5sum().empty() && file_handle->get_size() <= 0) { |
| LOG_INFO("{}: check backup on remote, current_checkpoint file {} is not exist", |
| name, |
| current_chkpt_file); |
| complete_check(false); |
| } else { |
| LOG_INFO("{}: check backup on remote, current_checkpoint file {} is exist", |
| name, |
| current_chkpt_file); |
| read_current_chkpt_file(file_handle); |
| } |
| } else if (resp.err == ERR_TIMEOUT) { |
| LOG_ERROR( |
| "{}: block service create file timeout, retry after 10 seconds, file = {}", |
| name, |
| current_chkpt_file); |
| |
| // before retry, should add_ref(), and must release_ref() after retry |
| add_ref(); |
| |
| tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP, |
| nullptr, |
| [this]() { |
| // before retry, should check whether the status is ready for |
| // check |
| if (!is_ready_for_check()) { |
| LOG_INFO("{}: backup status has changed to {}, ignore " |
| "checking backup on remote", |
| name, |
| cold_backup_status_to_string(status())); |
| ignore_check(); |
| } else { |
| check_backup_on_remote(); |
| } |
| release_ref(); |
| }, |
| 0, |
| std::chrono::seconds(10)); |
| } else { |
| LOG_ERROR("{}: block service create file failed, file = {}, err = {}", |
| name, |
| current_chkpt_file, |
| resp.err); |
| fail_check("block service create file failed"); |
| } |
| release_ref(); |
| }); |
| } |
| |
| void cold_backup_context::read_current_chkpt_file( |
| const dist::block_service::block_file_ptr &file_handle) |
| { |
| dist::block_service::read_request req; |
| req.remote_pos = 0; |
| req.remote_length = -1; |
| |
| add_ref(); |
| |
| file_handle->read( |
| std::move(req), |
| LPC_BACKGROUND_COLD_BACKUP, |
| [this, file_handle](const dist::block_service::read_response &resp) { |
| if (!is_ready_for_check()) { |
| LOG_INFO("{}: backup status has changed to {}, ignore checking backup on remote", |
| name, |
| cold_backup_status_to_string(status())); |
| ignore_check(); |
| } else if (resp.err == ERR_OK) { |
| std::string chkpt_dirname(resp.buffer.data(), resp.buffer.length()); |
| if (chkpt_dirname.empty()) { |
| complete_check(false); |
| } else { |
| LOG_INFO("{}: after read current_checkpoint_file, check whether remote " |
| "checkpoint dir = {} is exist", |
| name, |
| chkpt_dirname); |
| remote_chkpt_dir_exist(chkpt_dirname); |
| } |
| } else if (resp.err == ERR_TIMEOUT) { |
| LOG_ERROR("{}: read remote file timeout, retry after 10s, file = {}", |
| name, |
| file_handle->file_name()); |
| add_ref(); |
| |
| tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP, |
| nullptr, |
| [this, file_handle]() { |
| if (!is_ready_for_check()) { |
| LOG_INFO("{}: backup status has changed to {}, ignore " |
| "checking backup on remote", |
| name, |
| cold_backup_status_to_string(status())); |
| ignore_check(); |
| } else { |
| read_current_chkpt_file(file_handle); |
| } |
| release_ref(); |
| }, |
| 0, |
| std::chrono::seconds(10)); |
| } else { |
| LOG_ERROR("{}: read remote file failed, file = {}, err = {}", |
| name, |
| file_handle->file_name(), |
| resp.err); |
| fail_check("read remote file failed"); |
| } |
| release_ref(); |
| return; |
| }); |
| } |
| |
| void cold_backup_context::remote_chkpt_dir_exist(const std::string &chkpt_dirname) |
| { |
| dist::block_service::ls_request req; |
| req.dir_name = cold_backup::get_replica_backup_path( |
| backup_root, request.app_name, request.pid, request.backup_id); |
| |
| add_ref(); |
| |
| block_service->list_dir( |
| std::move(req), |
| LPC_BACKGROUND_COLD_BACKUP, |
| [this, chkpt_dirname](const dist::block_service::ls_response &resp) { |
| if (!is_ready_for_check()) { |
| LOG_INFO("{}: backup status has changed to {}, ignore checking backup on remote", |
| name, |
| cold_backup_status_to_string(status())); |
| ignore_check(); |
| } else if (resp.err == ERR_OK) { |
| bool found_chkpt_dir = false; |
| for (const auto &entry : (*resp.entries)) { |
| if (entry.is_directory && entry.entry_name == chkpt_dirname) { |
| found_chkpt_dir = true; |
| break; |
| } |
| } |
| if (found_chkpt_dir) { |
| LOG_INFO("{}: remote checkpoint dir is already exist, so upload have already " |
| "complete, remote_checkpoint_dirname = {}", |
| name, |
| chkpt_dirname); |
| complete_check(true); |
| } else { |
| LOG_INFO("{}: remote checkpoint dir is not exist, should re-upload checkpoint " |
| "dir, remote_checkpoint_dirname = {}", |
| name, |
| chkpt_dirname); |
| complete_check(false); |
| } |
| } else if (resp.err == ERR_OBJECT_NOT_FOUND) { |
| LOG_INFO("{}: remote checkpoint dir is not exist, should re-upload checkpoint dir, " |
| "remote_checkpoint_dirname = {}", |
| name, |
| chkpt_dirname); |
| complete_check(false); |
| } else if (resp.err == ERR_TIMEOUT) { |
| LOG_ERROR( |
| "{}: block service list remote dir timeout, retry after 10s, dirname = {}", |
| name, |
| chkpt_dirname); |
| add_ref(); |
| |
| tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP, |
| nullptr, |
| [this, chkpt_dirname]() { |
| if (!is_ready_for_check()) { |
| LOG_INFO("{}: backup status has changed to {}, ignore " |
| "checking backup on remote", |
| name, |
| cold_backup_status_to_string(status())); |
| ignore_check(); |
| } else { |
| remote_chkpt_dir_exist(chkpt_dirname); |
| } |
| release_ref(); |
| }, |
| 0, |
| std::chrono::seconds(10)); |
| } else { |
| LOG_ERROR("{}: block service list remote dir failed, dirname = {}, err = {}", |
| name, |
| chkpt_dirname, |
| resp.err); |
| fail_check("list remote dir failed"); |
| } |
| release_ref(); |
| return; |
| }); |
| } |
| |
| void cold_backup_context::upload_checkpoint_to_remote() |
| { |
| if (!is_ready_for_upload()) { |
| LOG_INFO("{}: backup status has changed to {}, ignore upload checkpoint", |
| name, |
| cold_backup_status_to_string(status())); |
| return; |
| } |
| |
| bool old_status = false; |
| // here, just allow one task to check upload status, and it will set _upload_status base on |
| // the result it has checked; But, because of upload_checkpoint_to_remote maybe call multi-times |
| // (for pause - uploading), so we use the atomic variant to implement |
| if (!_have_check_upload_status.compare_exchange_strong(old_status, true)) { |
| LOG_INFO("{}: upload status has already been checked, start upload checkpoint dir directly", |
| name); |
| on_upload_chkpt_dir(); |
| return; |
| } |
| |
| // check whether cold_backup_metadata is exist and verify cold_backup_metadata if exist |
| std::string metadata = cold_backup::get_remote_chkpt_meta_file( |
| backup_root, request.app_name, request.pid, request.backup_id); |
| dist::block_service::create_file_request req; |
| req.file_name = metadata; |
| req.ignore_metadata = false; |
| |
| add_ref(); |
| |
| block_service->create_file( |
| std::move(req), |
| LPC_BACKGROUND_COLD_BACKUP, |
| [this, metadata](const dist::block_service::create_file_response &resp) { |
| if (resp.err == ERR_OK) { |
| CHECK_NOTNULL(resp.file_handle, ""); |
| if (resp.file_handle->get_md5sum().empty() && resp.file_handle->get_size() <= 0) { |
| _upload_status.store(UploadUncomplete); |
| LOG_INFO("{}: check upload_status complete, cold_backup_metadata isn't exist, " |
| "start upload checkpoint dir", |
| name); |
| on_upload_chkpt_dir(); |
| } else { |
| LOG_INFO("{}: cold_backup_metadata is exist, read it's context", name); |
| read_backup_metadata(resp.file_handle); |
| } |
| } else if (resp.err == ERR_TIMEOUT) { |
| LOG_ERROR("{}: block service create file timeout, retry after 10s, file = {}", |
| name, |
| metadata); |
| // when create backup_metadata timeout, should reset _have_check_upload_status |
| // false to allow re-check |
| _have_check_upload_status.store(false); |
| add_ref(); |
| |
| tasking::enqueue( |
| LPC_BACKGROUND_COLD_BACKUP, |
| nullptr, |
| [this]() { |
| if (!is_ready_for_upload()) { |
| LOG_INFO( |
| "{}: backup status has changed to {}, stop check upload status", |
| name, |
| cold_backup_status_to_string(status())); |
| } else { |
| upload_checkpoint_to_remote(); |
| } |
| release_ref(); |
| }, |
| 0, |
| std::chrono::seconds(10)); |
| } else { |
| LOG_ERROR("{}: block service create file failed, file = {}, err = {}", |
| name, |
| metadata, |
| resp.err); |
| _have_check_upload_status.store(false); |
| fail_upload("block service create file failed"); |
| } |
| release_ref(); |
| return; |
| }); |
| } |
| |
| void cold_backup_context::read_backup_metadata( |
| const dist::block_service::block_file_ptr &file_handle) |
| { |
| dist::block_service::read_request req; |
| req.remote_pos = 0; |
| req.remote_length = -1; |
| |
| add_ref(); |
| |
| file_handle->read( |
| std::move(req), |
| LPC_BACKGROUND_COLD_BACKUP, |
| [this, file_handle](const dist::block_service::read_response &resp) { |
| if (resp.err == ERR_OK) { |
| LOG_INFO("{}: read cold_backup_metadata succeed, verify it's context, file = {}", |
| name, |
| file_handle->file_name()); |
| verify_backup_metadata(resp.buffer); |
| on_upload_chkpt_dir(); |
| } else if (resp.err == ERR_TIMEOUT) { |
| LOG_ERROR("{}: read remote file timeout, retry after 10s, file = {}", |
| name, |
| file_handle->file_name()); |
| add_ref(); |
| |
| tasking::enqueue( |
| LPC_BACKGROUND_COLD_BACKUP, |
| nullptr, |
| [this, file_handle] { |
| if (!is_ready_for_upload()) { |
| LOG_INFO( |
| "{}: backup status has changed to {}, stop check upload status", |
| name, |
| cold_backup_status_to_string(status())); |
| _have_check_upload_status.store(false); |
| } else { |
| read_backup_metadata(file_handle); |
| } |
| release_ref(); |
| }, |
| 0, |
| std::chrono::seconds(10)); |
| } else { |
| LOG_ERROR("{}: read remote file failed, file = {}, err = {}", |
| name, |
| file_handle->file_name(), |
| resp.err); |
| _have_check_upload_status.store(false); |
| fail_upload("read remote file failed"); |
| } |
| release_ref(); |
| return; |
| }); |
| } |
| |
| void cold_backup_context::verify_backup_metadata(const blob &value) |
| { |
| cold_backup_metadata tmp; |
| if (value.length() > 0 && json::json_forwarder<cold_backup_metadata>::decode(value, tmp)) { |
| LOG_INFO("{}: check upload status complete, checkpoint dir uploading has already complete", |
| name); |
| _upload_status.store(UploadComplete); |
| } else { |
| LOG_INFO("{}: check upload status complete, checkpoint dir uploading isn't complete yet", |
| name); |
| _upload_status.store(UploadUncomplete); |
| } |
| } |
| |
| void cold_backup_context::on_upload_chkpt_dir() |
| { |
| if (_upload_status.load() == UploadInvalid || !is_ready_for_upload()) { |
| LOG_INFO("{}: replica is not ready for uploading, ignore upload, cold_backup_status({})", |
| name, |
| cold_backup_status_to_string(status())); |
| return; |
| } |
| |
| if (_upload_status.load() == UploadComplete) { |
| // TODO: if call upload_checkpint_to_remote multi times, maybe write_current_chkpt_file |
| // multi times |
| std::string chkpt_dirname = cold_backup::get_remote_chkpt_dirname(); |
| write_current_chkpt_file(chkpt_dirname); |
| return; |
| } |
| |
| prepare_upload(); |
| |
| // prepare_upload maybe fail, so here check status |
| if (!is_ready_for_upload()) { |
| LOG_ERROR("{}: backup status has changed to {}, stop upload checkpoint dir", |
| name, |
| cold_backup_status_to_string(status())); |
| return; |
| } |
| |
| if (checkpoint_files.size() <= 0) { |
| LOG_INFO("{}: checkpoint dir is empty, so upload is complete and just start write " |
| "backup_metadata", |
| name); |
| bool old_status = false; |
| // using atomic variant _have_write_backup_metadata is to allow one task to |
| // write backup_metadata because on_upload_chkpt_dir maybe call multi-time |
| if (_have_write_backup_metadata.compare_exchange_strong(old_status, true)) { |
| write_backup_metadata(); |
| } |
| } else { |
| LOG_INFO("{}: start upload checkpoint dir, checkpoint dir = {}, total checkpoint file = {}", |
| name, |
| checkpoint_dir, |
| checkpoint_files.size()); |
| std::vector<std::string> files; |
| if (!upload_complete_or_fetch_uncomplete_files(files)) { |
| for (auto &file : files) { |
| LOG_INFO("{}: start upload checkpoint file to remote, file = {}", name, file); |
| upload_file(file); |
| } |
| } else { |
| LOG_INFO("{}: upload checkpoint dir to remote complete, total_file_cnt = {}", |
| name, |
| checkpoint_files.size()); |
| bool old_status = false; |
| if (_have_write_backup_metadata.compare_exchange_strong(old_status, true)) { |
| write_backup_metadata(); |
| } |
| } |
| } |
| } |
| |
| void cold_backup_context::prepare_upload() |
| { |
| zauto_lock l(_lock); |
| // only need initialize once |
| if (_metadata.files.size() > 0) { |
| return; |
| } |
| _file_remain_cnt = checkpoint_files.size(); |
| |
| _metadata.checkpoint_decree = checkpoint_decree; |
| _metadata.checkpoint_timestamp = checkpoint_timestamp; |
| _metadata.checkpoint_total_size = checkpoint_file_total_size; |
| for (int32_t idx = 0; idx < checkpoint_files.size(); idx++) { |
| std::string &file = checkpoint_files[idx]; |
| file_meta f_meta; |
| f_meta.name = file; |
| std::string file_full_path = ::dsn::utils::filesystem::path_combine(checkpoint_dir, file); |
| int64_t file_size = checkpoint_file_sizes[idx]; |
| std::string file_md5; |
| if (::dsn::utils::filesystem::md5sum(file_full_path, file_md5) != ERR_OK) { |
| LOG_ERROR("{}: get local file size or md5 fail, file = {}", name, file_full_path); |
| fail_upload("compute local file size or md5 failed"); |
| return; |
| } |
| f_meta.md5 = file_md5; |
| f_meta.size = file_size; |
| _metadata.files.emplace_back(f_meta); |
| _file_status.insert(std::make_pair(file, FileUploadUncomplete)); |
| _file_infos.insert(std::make_pair(file, std::make_pair(file_size, file_md5))); |
| } |
| _upload_file_size.store(0); |
| } |
| |
| void cold_backup_context::upload_file(const std::string &local_filename) |
| { |
| std::string remote_chkpt_dir = cold_backup::get_remote_chkpt_dir( |
| backup_root, request.app_name, request.pid, request.backup_id); |
| dist::block_service::create_file_request req; |
| req.file_name = ::dsn::utils::filesystem::path_combine(remote_chkpt_dir, local_filename); |
| req.ignore_metadata = false; |
| |
| add_ref(); |
| |
| block_service->create_file( |
| std::move(req), |
| LPC_BACKGROUND_COLD_BACKUP, |
| [this, local_filename](const dist::block_service::create_file_response &resp) { |
| if (resp.err == ERR_OK) { |
| const dist::block_service::block_file_ptr &file_handle = resp.file_handle; |
| CHECK_NOTNULL(file_handle, ""); |
| int64_t local_file_size = _file_infos.at(local_filename).first; |
| std::string md5 = _file_infos.at(local_filename).second; |
| std::string full_path_local_file = |
| ::dsn::utils::filesystem::path_combine(checkpoint_dir, local_filename); |
| if (md5 == file_handle->get_md5sum() && |
| local_file_size == file_handle->get_size()) { |
| LOG_INFO("{}: checkpoint file already exist on remote, file = {}", |
| name, |
| full_path_local_file); |
| on_upload_file_complete(local_filename); |
| } else { |
| LOG_INFO("{}: start upload checkpoint file to remote, file = {}", |
| name, |
| full_path_local_file); |
| on_upload(file_handle, full_path_local_file); |
| } |
| } else if (resp.err == ERR_TIMEOUT) { |
| LOG_ERROR("{}: block service create file timeout, retry after 10s, file = {}", |
| name, |
| local_filename); |
| add_ref(); |
| |
| tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP, |
| nullptr, |
| [this, local_filename]() { |
| // TODO: status change from ColdBackupUploading to |
| // ColdBackupPaused, and upload file timeout, but when callback |
| // is executed it catches the status(ColdBackupPaused) |
| // now, if status back to ColdBackupUploading very soon, and |
| // call upload_checkpoint_to_remote() here, |
| // upload_checkpoint_to_remote() maybe acquire the _lock first, |
| // then stop give back file(upload timeout), the file is still |
| // in uploading this file will not be uploaded until you call |
| // upload_checkpoint_to_remote() after it's given back |
| if (!is_ready_for_upload()) { |
| std::string full_path_local_file = |
| ::dsn::utils::filesystem::path_combine(checkpoint_dir, |
| local_filename); |
| LOG_INFO("{}: backup status has changed to {}, stop " |
| "upload checkpoint file to remote, file = {}", |
| name, |
| cold_backup_status_to_string(status()), |
| full_path_local_file); |
| file_upload_uncomplete(local_filename); |
| } else { |
| upload_file(local_filename); |
| } |
| release_ref(); |
| }, |
| 0, |
| std::chrono::seconds(10)); |
| } else { |
| LOG_ERROR("{}: block service create file failed, file = {}, err = {}", |
| name, |
| local_filename, |
| resp.err); |
| fail_upload("create file failed"); |
| } |
| if (resp.err != ERR_OK && _owner_replica != nullptr) { |
| METRIC_INCREMENT(*_owner_replica, backup_file_upload_failed_count); |
| } |
| release_ref(); |
| return; |
| }); |
| } |
| |
| void cold_backup_context::on_upload(const dist::block_service::block_file_ptr &file_handle, |
| const std::string &full_path_local_file) |
| { |
| dist::block_service::upload_request req; |
| req.input_local_name = full_path_local_file; |
| |
| add_ref(); |
| |
| file_handle->upload( |
| std::move(req), |
| LPC_BACKGROUND_COLD_BACKUP, |
| [this, file_handle, full_path_local_file]( |
| const dist::block_service::upload_response &resp) { |
| if (resp.err == ERR_OK) { |
| std::string local_filename = |
| ::dsn::utils::filesystem::get_file_name(full_path_local_file); |
| CHECK_EQ(_file_infos.at(local_filename).first, |
| static_cast<int64_t>(resp.uploaded_size)); |
| LOG_INFO( |
| "{}: upload checkpoint file complete, file = {}", name, full_path_local_file); |
| on_upload_file_complete(local_filename); |
| } else if (resp.err == ERR_TIMEOUT) { |
| LOG_ERROR("{}: upload checkpoint file timeout, retry after 10s, file = {}", |
| name, |
| full_path_local_file); |
| add_ref(); |
| |
| tasking::enqueue( |
| LPC_BACKGROUND_COLD_BACKUP, |
| nullptr, |
| [this, file_handle, full_path_local_file]() { |
| if (!is_ready_for_upload()) { |
| LOG_ERROR("{}: backup status has changed to {}, stop upload " |
| "checkpoint file to remote, file = {}", |
| name, |
| cold_backup_status_to_string(status()), |
| full_path_local_file); |
| std::string local_filename = |
| ::dsn::utils::filesystem::get_file_name(full_path_local_file); |
| file_upload_uncomplete(local_filename); |
| } else { |
| on_upload(file_handle, full_path_local_file); |
| } |
| release_ref(); |
| }, |
| 0, |
| std::chrono::seconds(10)); |
| } else { |
| LOG_ERROR("{}: upload checkpoint file to remote failed, file = {}, err = {}", |
| name, |
| full_path_local_file, |
| resp.err); |
| fail_upload("upload checkpoint file to remote failed"); |
| } |
| if (resp.err != ERR_OK && _owner_replica != nullptr) { |
| METRIC_INCREMENT(*_owner_replica, backup_file_upload_failed_count); |
| } |
| release_ref(); |
| return; |
| }); |
| } |
| |
| void cold_backup_context::write_backup_metadata() |
| { |
| if (_upload_status.load() == UploadComplete) { |
| LOG_INFO("{}: upload have already done, no need write metadata again", name); |
| return; |
| } |
| std::string metadata = cold_backup::get_remote_chkpt_meta_file( |
| backup_root, request.app_name, request.pid, request.backup_id); |
| dist::block_service::create_file_request req; |
| req.file_name = metadata; |
| req.ignore_metadata = true; |
| |
| add_ref(); |
| |
| block_service->create_file( |
| std::move(req), |
| LPC_BACKGROUND_COLD_BACKUP, |
| [this, metadata](const dist::block_service::create_file_response &resp) { |
| if (resp.err == ERR_OK) { |
| CHECK_NOTNULL(resp.file_handle, ""); |
| blob buffer = json::json_forwarder<cold_backup_metadata>::encode(_metadata); |
| // hold itself until callback is executed |
| add_ref(); |
| LOG_INFO("{}: create backup metadata file succeed, start to write file, file = {}", |
| name, |
| metadata); |
| this->on_write(resp.file_handle, buffer, [this](bool succeed) { |
| if (succeed) { |
| std::string chkpt_dirname = cold_backup::get_remote_chkpt_dirname(); |
| _upload_status.store(UploadComplete); |
| LOG_INFO( |
| "{}: write backup metadata complete, write current checkpoint file", |
| name); |
| write_current_chkpt_file(chkpt_dirname); |
| } |
| // NOTICE: write file fail will internal error be processed in on_write() |
| release_ref(); |
| }); |
| } else if (resp.err == ERR_TIMEOUT) { |
| LOG_ERROR("{}: block service create file timeout, retry after 10s, file = {}", |
| name, |
| metadata); |
| add_ref(); |
| |
| tasking::enqueue( |
| LPC_BACKGROUND_COLD_BACKUP, |
| nullptr, |
| [this]() { |
| if (!is_ready_for_upload()) { |
| _have_write_backup_metadata.store(false); |
| LOG_ERROR( |
| "{}: backup status has changed to {}, stop write backup_metadata", |
| name, |
| cold_backup_status_to_string(status())); |
| } else { |
| write_backup_metadata(); |
| } |
| release_ref(); |
| }, |
| 0, |
| std::chrono::seconds(10)); |
| } else { |
| LOG_ERROR("{}: block service create file failed, file = {}, err = {}", |
| name, |
| metadata, |
| resp.err); |
| _have_write_backup_metadata.store(false); |
| fail_upload("create file failed"); |
| } |
| release_ref(); |
| return; |
| }); |
| } |
| |
| void cold_backup_context::write_current_chkpt_file(const std::string &value) |
| { |
| // before we write current checkpoint file, we can release the memory occupied by _metadata, |
| // _file_status and _file_infos, because even if write current checkpoint file failed, the |
| // backup_metadata is uploading succeed, so we will not re-upload |
| _metadata.files.clear(); |
| _file_infos.clear(); |
| _file_status.clear(); |
| |
| if (!is_ready_for_upload()) { |
| LOG_INFO("{}: backup status has changed to {}, stop write current checkpoint file", |
| name, |
| cold_backup_status_to_string(status())); |
| return; |
| } |
| |
| std::string current_chkpt_file = cold_backup::get_current_chkpt_file( |
| backup_root, request.app_name, request.pid, request.backup_id); |
| dist::block_service::create_file_request req; |
| req.file_name = current_chkpt_file; |
| req.ignore_metadata = true; |
| |
| add_ref(); |
| |
| block_service->create_file( |
| std::move(req), |
| LPC_BACKGROUND_COLD_BACKUP, |
| [this, value, current_chkpt_file](const dist::block_service::create_file_response &resp) { |
| if (resp.err == ERR_OK) { |
| CHECK_NOTNULL(resp.file_handle, ""); |
| auto len = value.length(); |
| std::shared_ptr<char> buf = utils::make_shared_array<char>(len); |
| ::memcpy(buf.get(), value.c_str(), len); |
| blob write_buf(std::move(buf), static_cast<unsigned int>(len)); |
| LOG_INFO("{}: create current checkpoint file succeed, start write file ,file = {}", |
| name, |
| current_chkpt_file); |
| add_ref(); |
| this->on_write(resp.file_handle, write_buf, [this](bool succeed) { |
| if (succeed) { |
| complete_upload(); |
| } |
| release_ref(); |
| }); |
| } else if (resp.err == ERR_TIMEOUT) { |
| LOG_ERROR("{}: block file create file timeout, retry after 10s, file = {}", |
| name, |
| current_chkpt_file); |
| add_ref(); |
| |
| tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP, |
| nullptr, |
| [this, value]() { |
| if (!is_ready_for_upload()) { |
| LOG_INFO("{}: backup status has changed to {}, stop write " |
| "current checkpoint file", |
| name, |
| cold_backup_status_to_string(status())); |
| } else { |
| write_current_chkpt_file(value); |
| } |
| |
| release_ref(); |
| }, |
| 0, |
| std::chrono::seconds(10)); |
| } else { |
| LOG_ERROR("{}: block service create file failed, file = {}, err = {}", |
| name, |
| current_chkpt_file, |
| resp.err); |
| fail_upload("create file failed"); |
| } |
| release_ref(); |
| return; |
| }); |
| } |
| |
| void cold_backup_context::on_write(const dist::block_service::block_file_ptr &file_handle, |
| const blob &value, |
| const std::function<void(bool)> &callback) |
| { |
| CHECK_NOTNULL(file_handle, ""); |
| dist::block_service::write_request req; |
| req.buffer = value; |
| |
| add_ref(); |
| |
| file_handle->write( |
| std::move(req), |
| LPC_BACKGROUND_COLD_BACKUP, |
| [this, value, file_handle, callback](const dist::block_service::write_response &resp) { |
| if (resp.err == ERR_OK) { |
| LOG_INFO( |
| "{}: write remote file succeed, file = {}", name, file_handle->file_name()); |
| callback(true); |
| } else if (resp.err == ERR_TIMEOUT) { |
| LOG_INFO("{}: write remote file timeout, retry after 10s, file = {}", |
| name, |
| file_handle->file_name()); |
| add_ref(); |
| |
| tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP, |
| nullptr, |
| [this, file_handle, value, callback]() { |
| if (!is_ready_for_upload()) { |
| LOG_INFO("{}: backup status has changed to {}, stop write " |
| "remote file, file = {}", |
| name, |
| cold_backup_status_to_string(status()), |
| file_handle->file_name()); |
| } else { |
| on_write(file_handle, value, callback); |
| } |
| release_ref(); |
| }, |
| 0, |
| std::chrono::seconds(10)); |
| } else { |
| // here, must call the callback to release_ref |
| callback(false); |
| LOG_ERROR("{}: write remote file failed, file = {}, err = {}", |
| name, |
| file_handle->file_name(), |
| resp.err); |
| fail_upload("write remote file failed"); |
| } |
| release_ref(); |
| return; |
| }); |
| } |
| |
| void cold_backup_context::on_upload_file_complete(const std::string &local_filename) |
| { |
| const int64_t &f_size = _file_infos.at(local_filename).first; |
| _upload_file_size.fetch_add(f_size); |
| file_upload_complete(local_filename); |
| if (_owner_replica != nullptr) { |
| METRIC_INCREMENT(*_owner_replica, backup_file_upload_successful_count); |
| METRIC_INCREMENT_BY(*_owner_replica, backup_file_upload_total_bytes, f_size); |
| } |
| // update progress |
| // int a = 10; int b = 3; then b/a = 0; |
| // double a = 10; double b = 3; then b/a = 0.3 |
| auto total = static_cast<double>(checkpoint_file_total_size); |
| auto complete_size = static_cast<double>(_upload_file_size.load()); |
| |
| if (total <= complete_size) { |
| LOG_INFO("{}: upload checkpoint to remote complete, checkpoint dir = {}, total file size " |
| "= {}, file count = {}", |
| name, |
| checkpoint_dir, |
| total, |
| checkpoint_files.size()); |
| bool old_status = false; |
| if (_have_write_backup_metadata.compare_exchange_strong(old_status, true)) { |
| write_backup_metadata(); |
| } |
| return; |
| } else { |
| CHECK_GT(total, 0.0); |
| update_progress(static_cast<int>(complete_size / total * 1000)); |
| LOG_INFO("{}: the progress of upload checkpoint is {}", name, _progress.load()); |
| } |
| if (is_ready_for_upload()) { |
| std::vector<std::string> upload_files; |
| upload_complete_or_fetch_uncomplete_files(upload_files); |
| for (auto &file : upload_files) { |
| LOG_INFO("{}: start upload checkpoint file to remote, file = {}", name, file); |
| upload_file(file); |
| } |
| } |
| } |
| |
| bool cold_backup_context::upload_complete_or_fetch_uncomplete_files(std::vector<std::string> &files) |
| { |
| bool upload_complete = false; |
| |
| zauto_lock l(_lock); |
| if (_file_remain_cnt > 0 && _cur_upload_file_cnt < _max_concurrent_uploading_file_cnt) { |
| for (const auto &_pair : _file_status) { |
| if (_pair.second == file_status::FileUploadUncomplete) { |
| files.emplace_back(_pair.first); |
| _file_remain_cnt -= 1; |
| _file_status[_pair.first] = file_status::FileUploading; |
| _cur_upload_file_cnt += 1; |
| } |
| if (_file_remain_cnt <= 0 || |
| _cur_upload_file_cnt >= _max_concurrent_uploading_file_cnt) { |
| break; |
| } |
| } |
| } |
| if (_file_remain_cnt <= 0 && _cur_upload_file_cnt <= 0) { |
| upload_complete = true; |
| } |
| return upload_complete; |
| } |
| |
| void cold_backup_context::file_upload_uncomplete(const std::string &filename) |
| { |
| zauto_lock l(_lock); |
| |
| CHECK_GE(_cur_upload_file_cnt, 1); |
| _cur_upload_file_cnt -= 1; |
| _file_remain_cnt += 1; |
| _file_status[filename] = file_status::FileUploadUncomplete; |
| } |
| |
| void cold_backup_context::file_upload_complete(const std::string &filename) |
| { |
| zauto_lock l(_lock); |
| |
| CHECK_GE(_cur_upload_file_cnt, 1); |
| _cur_upload_file_cnt -= 1; |
| _file_status[filename] = file_status::FileUploadComplete; |
| } |
| |
| } // namespace replication |
| } // namespace dsn |