| /* |
| * The MIT License (MIT) |
| * |
| * Copyright (c) 2015 Microsoft Corporation |
| * |
| * -=- Robust Distributed System Nucleus (rDSN) -=- |
| * |
| * Permission is hereby granted, free of charge, to any person obtaining a copy |
| * of this software and associated documentation files (the "Software"), to deal |
| * in the Software without restriction, including without limitation the rights |
| * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
| * copies of the Software, and to permit persons to whom the Software is |
| * furnished to do so, subject to the following conditions: |
| * |
| * The above copyright notice and this permission notice shall be included in |
| * all copies or substantial portions of the Software. |
| * |
| * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
| * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
| * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
| * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
| * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN |
| * THE SOFTWARE. |
| */ |
| |
| #include "replica.h" |
| |
| #include <absl/strings/string_view.h> |
| #include <fmt/core.h> |
| #include <rocksdb/status.h> |
| #include <functional> |
| #include <vector> |
| |
| #include "backup/replica_backup_manager.h" |
| #include "bulk_load/replica_bulk_loader.h" |
| #include "common/backup_common.h" |
| #include "common/fs_manager.h" |
| #include "common/gpid.h" |
| #include "common/replica_envs.h" |
| #include "common/replication_enums.h" |
| #include "consensus_types.h" |
| #include "duplication/replica_duplicator_manager.h" |
| #include "duplication/replica_follower.h" |
| #include "mutation.h" |
| #include "mutation_log.h" |
| #include "replica/prepare_list.h" |
| #include "replica/replica_context.h" |
| #include "replica/replication_app_base.h" |
| #include "replica_admin_types.h" |
| #include "replica_disk_migrator.h" |
| #include "replica_stub.h" |
| #include "runtime/rpc/rpc_message.h" |
| #include "security/access_controller.h" |
| #include "split/replica_split_manager.h" |
| #include "utils/filesystem.h" |
| #include "utils/fmt_logging.h" |
| #include "utils/latency_tracer.h" |
| #include "utils/ports.h" |
| #include "utils/rand.h" |
| |
| METRIC_DEFINE_gauge_int64(replica, |
| private_log_size_mb, |
| dsn::metric_unit::kMegaBytes, |
| "The size of private log in MB"); |
| |
| METRIC_DEFINE_counter(replica, |
| throttling_delayed_write_requests, |
| dsn::metric_unit::kRequests, |
| "The number of delayed write requests by throttling"); |
| |
| METRIC_DEFINE_counter(replica, |
| throttling_rejected_write_requests, |
| dsn::metric_unit::kRequests, |
| "The number of rejected write requests by throttling"); |
| |
| METRIC_DEFINE_counter(replica, |
| throttling_delayed_read_requests, |
| dsn::metric_unit::kRequests, |
| "The number of delayed read requests by throttling"); |
| |
| METRIC_DEFINE_counter(replica, |
| throttling_rejected_read_requests, |
| dsn::metric_unit::kRequests, |
| "The number of rejected read requests by throttling"); |
| |
| METRIC_DEFINE_counter(replica, |
| backup_requests, |
| dsn::metric_unit::kRequests, |
| "The number of backup requests"); |
| |
| METRIC_DEFINE_counter(replica, |
| throttling_delayed_backup_requests, |
| dsn::metric_unit::kRequests, |
| "The number of delayed backup requests by throttling"); |
| |
| METRIC_DEFINE_counter(replica, |
| throttling_rejected_backup_requests, |
| dsn::metric_unit::kRequests, |
| "The number of rejected backup requests by throttling"); |
| |
| METRIC_DEFINE_counter(replica, |
| splitting_rejected_write_requests, |
| dsn::metric_unit::kRequests, |
| "The number of rejected write requests by splitting"); |
| |
| METRIC_DEFINE_counter(replica, |
| splitting_rejected_read_requests, |
| dsn::metric_unit::kRequests, |
| "The number of rejected read requests by splitting"); |
| |
| METRIC_DEFINE_counter(replica, |
| bulk_load_ingestion_rejected_write_requests, |
| dsn::metric_unit::kRequests, |
| "The number of rejected write requests by bulk load ingestion"); |
| |
| METRIC_DEFINE_counter(replica, |
| dup_rejected_non_idempotent_write_requests, |
| dsn::metric_unit::kRequests, |
| "The number of rejected non-idempotent write requests by duplication"); |
| |
| METRIC_DEFINE_counter( |
| replica, |
| learn_count, |
| dsn::metric_unit::kLearns, |
| "The number of learns launched by learner (i.e. potential secondary replica)"); |
| |
| METRIC_DEFINE_counter(replica, |
| learn_rounds, |
| dsn::metric_unit::kRounds, |
| "The number of learn rounds launched by learner (during a learn there might" |
| "be multiple rounds)"); |
| |
| METRIC_DEFINE_counter(replica, |
| learn_copy_files, |
| dsn::metric_unit::kFiles, |
| "The number of files that are copied from learnee (i.e. primary replica)"); |
| |
| METRIC_DEFINE_counter(replica, |
| learn_copy_file_bytes, |
| dsn::metric_unit::kBytes, |
| "The size of file that are copied from learnee"); |
| |
| METRIC_DEFINE_counter(replica, |
| learn_copy_buffer_bytes, |
| dsn::metric_unit::kBytes, |
| "The size of data that are copied from learnee's buffer"); |
| |
| METRIC_DEFINE_counter(replica, |
| learn_lt_cache_responses, |
| dsn::metric_unit::kResponses, |
| "The number of learn responses of LT_CACHE type decided by learner, with " |
| "each learn response related to an RPC_LEARN request"); |
| |
| METRIC_DEFINE_counter(replica, |
| learn_lt_app_responses, |
| dsn::metric_unit::kResponses, |
| "The number of learn responses of LT_APP type decided by learner, with each " |
| "learn response related to an RPC_LEARN request"); |
| |
| METRIC_DEFINE_counter(replica, |
| learn_lt_log_responses, |
| dsn::metric_unit::kResponses, |
| "The number of learn responses of LT_LOG type decided by learner, with each " |
| "learn response related to an RPC_LEARN request"); |
| |
| METRIC_DEFINE_counter(replica, |
| learn_resets, |
| dsn::metric_unit::kResets, |
| "The number of times learner resets its local state (since its local state " |
| "is newer than learnee's), with each reset related to an learn response of " |
| "an RPC_LEARN request"); |
| |
| METRIC_DEFINE_counter(replica, |
| learn_failed_count, |
| dsn::metric_unit::kLearns, |
| "The number of failed learns launched by learner"); |
| |
| METRIC_DEFINE_counter(replica, |
| learn_successful_count, |
| dsn::metric_unit::kLearns, |
| "The number of successful learns launched by learner"); |
| |
| METRIC_DEFINE_counter(replica, |
| prepare_failed_requests, |
| dsn::metric_unit::kRequests, |
| "The number of failed RPC_PREPARE requests"); |
| |
| METRIC_DEFINE_counter(replica, |
| group_check_failed_requests, |
| dsn::metric_unit::kRequests, |
| "The number of failed RPC_GROUP_CHECK requests launched by primary replicas"); |
| |
| METRIC_DEFINE_counter(replica, |
| emergency_checkpoints, |
| dsn::metric_unit::kCheckpoints, |
| "The number of triggered emergency checkpoints"); |
| |
| METRIC_DEFINE_counter(replica, |
| write_size_exceed_threshold_requests, |
| dsn::metric_unit::kRequests, |
| "The number of write requests whose size exceeds threshold"); |
| |
| METRIC_DEFINE_counter(replica, |
| backup_started_count, |
| dsn::metric_unit::kBackups, |
| "The number of started backups"); |
| |
| METRIC_DEFINE_counter(replica, |
| backup_failed_count, |
| dsn::metric_unit::kBackups, |
| "The number of failed backups"); |
| |
| METRIC_DEFINE_counter(replica, |
| backup_successful_count, |
| dsn::metric_unit::kBackups, |
| "The number of successful backups"); |
| |
| METRIC_DEFINE_counter(replica, |
| backup_cancelled_count, |
| dsn::metric_unit::kBackups, |
| "The number of cancelled backups"); |
| |
| METRIC_DEFINE_counter(replica, |
| backup_file_upload_failed_count, |
| dsn::metric_unit::kFileUploads, |
| "The number of failed file uploads for backups"); |
| |
| METRIC_DEFINE_counter(replica, |
| backup_file_upload_successful_count, |
| dsn::metric_unit::kFileUploads, |
| "The number of successful file uploads for backups"); |
| |
| METRIC_DEFINE_counter(replica, |
| backup_file_upload_total_bytes, |
| dsn::metric_unit::kBytes, |
| "The total size of uploaded files for backups"); |
| |
| namespace dsn { |
| namespace replication { |
| |
| DSN_DEFINE_bool(replication, |
| batch_write_disabled, |
| false, |
| "whether to disable auto-batch of replicated write requests"); |
| DSN_DEFINE_int32(replication, |
| staleness_for_commit, |
| 10, |
| "how many concurrent two phase commit rounds are allowed"); |
| DSN_DEFINE_int32(replication, |
| max_mutation_count_in_prepare_list, |
| 110, |
| "maximum number of mutations in prepare list"); |
| DSN_DEFINE_group_validator(max_mutation_count_in_prepare_list, [](std::string &message) -> bool { |
| if (FLAGS_max_mutation_count_in_prepare_list < FLAGS_staleness_for_commit) { |
| message = fmt::format("replication.max_mutation_count_in_prepare_list({}) should be >= " |
| "replication.staleness_for_commit({})", |
| FLAGS_max_mutation_count_in_prepare_list, |
| FLAGS_staleness_for_commit); |
| return false; |
| } |
| return true; |
| }); |
| |
| DSN_DECLARE_int32(checkpoint_max_interval_hours); |
| |
| const std::string replica::kAppInfo = ".app-info"; |
| |
| replica::replica(replica_stub *stub, |
| gpid gpid, |
| const app_info &app, |
| dir_node *dn, |
| bool need_restore, |
| bool is_duplication_follower) |
| : serverlet<replica>("replica"), |
| replica_base(gpid, fmt::format("{}@{}", gpid, stub->_primary_address_str), app.app_name), |
| _app_info(app), |
| _primary_states(gpid, FLAGS_staleness_for_commit, FLAGS_batch_write_disabled), |
| _potential_secondary_states(this), |
| _chkpt_total_size(0), |
| _cur_download_size(0), |
| _restore_progress(0), |
| _restore_status(ERR_OK), |
| _duplication_mgr(new replica_duplicator_manager(this)), |
| // todo(jiashuo1): app.duplicating need rename |
| _is_duplication_master(app.duplicating), |
| _is_duplication_follower(is_duplication_follower), |
| _backup_mgr(new replica_backup_manager(this)), |
| METRIC_VAR_INIT_replica(private_log_size_mb), |
| METRIC_VAR_INIT_replica(throttling_delayed_write_requests), |
| METRIC_VAR_INIT_replica(throttling_rejected_write_requests), |
| METRIC_VAR_INIT_replica(throttling_delayed_read_requests), |
| METRIC_VAR_INIT_replica(throttling_rejected_read_requests), |
| METRIC_VAR_INIT_replica(backup_requests), |
| METRIC_VAR_INIT_replica(throttling_delayed_backup_requests), |
| METRIC_VAR_INIT_replica(throttling_rejected_backup_requests), |
| METRIC_VAR_INIT_replica(splitting_rejected_write_requests), |
| METRIC_VAR_INIT_replica(splitting_rejected_read_requests), |
| METRIC_VAR_INIT_replica(bulk_load_ingestion_rejected_write_requests), |
| METRIC_VAR_INIT_replica(dup_rejected_non_idempotent_write_requests), |
| METRIC_VAR_INIT_replica(learn_count), |
| METRIC_VAR_INIT_replica(learn_rounds), |
| METRIC_VAR_INIT_replica(learn_copy_files), |
| METRIC_VAR_INIT_replica(learn_copy_file_bytes), |
| METRIC_VAR_INIT_replica(learn_copy_buffer_bytes), |
| METRIC_VAR_INIT_replica(learn_lt_cache_responses), |
| METRIC_VAR_INIT_replica(learn_lt_app_responses), |
| METRIC_VAR_INIT_replica(learn_lt_log_responses), |
| METRIC_VAR_INIT_replica(learn_resets), |
| METRIC_VAR_INIT_replica(learn_failed_count), |
| METRIC_VAR_INIT_replica(learn_successful_count), |
| METRIC_VAR_INIT_replica(prepare_failed_requests), |
| METRIC_VAR_INIT_replica(group_check_failed_requests), |
| METRIC_VAR_INIT_replica(emergency_checkpoints), |
| METRIC_VAR_INIT_replica(write_size_exceed_threshold_requests), |
| METRIC_VAR_INIT_replica(backup_started_count), |
| METRIC_VAR_INIT_replica(backup_failed_count), |
| METRIC_VAR_INIT_replica(backup_successful_count), |
| METRIC_VAR_INIT_replica(backup_cancelled_count), |
| METRIC_VAR_INIT_replica(backup_file_upload_failed_count), |
| METRIC_VAR_INIT_replica(backup_file_upload_successful_count), |
| METRIC_VAR_INIT_replica(backup_file_upload_total_bytes) |
| { |
| CHECK(!_app_info.app_type.empty(), ""); |
| CHECK_NOTNULL(stub, ""); |
| _stub = stub; |
| CHECK_NOTNULL(dn, ""); |
| _dir_node = dn; |
| _dir = dn->replica_dir(_app_info.app_type, gpid); |
| _options = &stub->options(); |
| init_state(); |
| _config.pid = gpid; |
| _bulk_loader = std::make_unique<replica_bulk_loader>(this); |
| _split_mgr = std::make_unique<replica_split_manager>(this); |
| _disk_migrator = std::make_unique<replica_disk_migrator>(this); |
| _replica_follower = std::make_unique<replica_follower>(this); |
| |
| if (need_restore) { |
| // add an extra env for restore |
| _extra_envs.insert( |
| std::make_pair(backup_restore_constant::FORCE_RESTORE, std::string("true"))); |
| } |
| |
| _access_controller = security::create_replica_access_controller(name()); |
| } |
| |
| void replica::update_last_checkpoint_generate_time() |
| { |
| _last_checkpoint_generate_time_ms = dsn_now_ms(); |
| uint64_t max_interval_ms = FLAGS_checkpoint_max_interval_hours * 3600000UL; |
| // use random trigger time to avoid flush peek |
| _next_checkpoint_interval_trigger_time_ms = |
| _last_checkpoint_generate_time_ms + rand::next_u64(max_interval_ms / 2, max_interval_ms); |
| } |
| |
| void replica::init_state() |
| { |
| _inactive_is_transient = false; |
| _is_initializing = false; |
| _prepare_list = std::make_unique<prepare_list>( |
| this, |
| 0, |
| FLAGS_max_mutation_count_in_prepare_list, |
| std::bind(&replica::execute_mutation, this, std::placeholders::_1)); |
| |
| _config.ballot = 0; |
| _config.pid.set_app_id(0); |
| _config.pid.set_partition_index(0); |
| _config.status = partition_status::PS_INACTIVE; |
| _primary_states.membership.ballot = 0; |
| _create_time_ms = dsn_now_ms(); |
| _last_config_change_time_ms = _create_time_ms; |
| update_last_checkpoint_generate_time(); |
| _private_log = nullptr; |
| get_bool_envs(_app_info.envs, replica_envs::ROCKSDB_ALLOW_INGEST_BEHIND, _allow_ingest_behind); |
| } |
| |
| replica::~replica(void) |
| { |
| close(); |
| _prepare_list = nullptr; |
| LOG_DEBUG_PREFIX("replica destroyed"); |
| } |
| |
| void replica::on_client_read(dsn::message_ex *request, bool ignore_throttling) |
| { |
| if (!_access_controller->allowed(request, ranger::access_type::kRead)) { |
| response_client_read(request, ERR_ACL_DENY); |
| return; |
| } |
| |
| if (_deny_client.read) { |
| if (_deny_client.reconfig) { |
| // return ERR_INVALID_STATE will trigger client update config immediately |
| response_client_read(request, ERR_INVALID_STATE); |
| return; |
| } |
| // Do not reply any message to the peer client to let it timeout, it's OK coz some users |
| // may retry immediately when they got a not success code which will make the server side |
| // pressure more and more heavy. |
| return; |
| } |
| |
| CHECK_REQUEST_IF_SPLITTING(read); |
| |
| if (status() == partition_status::PS_INACTIVE || |
| status() == partition_status::PS_POTENTIAL_SECONDARY) { |
| response_client_read(request, ERR_INVALID_STATE); |
| return; |
| } |
| |
| if (!request->is_backup_request()) { |
| // only backup request is allowed to read from a stale replica |
| |
| if (!ignore_throttling && throttle_read_request(request)) { |
| return; |
| } |
| |
| if (status() != partition_status::PS_PRIMARY) { |
| response_client_read(request, ERR_INVALID_STATE); |
| return; |
| } |
| |
| // a small window where the state is not the latest yet |
| if (last_committed_decree() < _primary_states.last_prepare_decree_on_new_primary) { |
| LOG_ERROR_PREFIX("last_committed_decree({}) < last_prepare_decree_on_new_primary({})", |
| last_committed_decree(), |
| _primary_states.last_prepare_decree_on_new_primary); |
| response_client_read(request, ERR_INVALID_STATE); |
| return; |
| } |
| } else { |
| if (!ignore_throttling && throttle_backup_request(request)) { |
| return; |
| } |
| METRIC_VAR_INCREMENT(backup_requests); |
| } |
| |
| CHECK(_app, ""); |
| auto storage_error = _app->on_request(request); |
| // kNotFound is normal, it indicates that the key is not found (including expired) |
| // in the storage engine, so just ignore it. |
| if (dsn_unlikely(storage_error != rocksdb::Status::kOk && |
| storage_error != rocksdb::Status::kNotFound)) { |
| switch (storage_error) { |
| // TODO(yingchun): Now only kCorruption and kIOError are dealt, consider to deal with |
| // more storage engine errors. |
| case rocksdb::Status::kCorruption: |
| handle_local_failure(ERR_RDB_CORRUPTION); |
| break; |
| case rocksdb::Status::kIOError: |
| handle_local_failure(ERR_DISK_IO_ERROR); |
| break; |
| default: |
| LOG_ERROR_PREFIX("client read encountered an unhandled error: {}", storage_error); |
| } |
| return; |
| } |
| } |
| |
| void replica::response_client_read(dsn::message_ex *request, error_code error) |
| { |
| _stub->response_client(get_gpid(), true, request, status(), error); |
| } |
| |
| void replica::response_client_write(dsn::message_ex *request, error_code error) |
| { |
| _stub->response_client(get_gpid(), false, request, status(), error); |
| } |
| |
| void replica::check_state_completeness() |
| { |
| /* prepare commit durable */ |
| CHECK_GE(max_prepared_decree(), last_committed_decree()); |
| CHECK_GE(last_committed_decree(), last_durable_decree()); |
| } |
| |
| void replica::execute_mutation(mutation_ptr &mu) |
| { |
| LOG_DEBUG_PREFIX( |
| "execute mutation {}: request_count = {}", mu->name(), mu->client_requests.size()); |
| |
| error_code err = ERR_OK; |
| decree d = mu->data.header.decree; |
| |
| switch (status()) { |
| case partition_status::PS_INACTIVE: |
| if (_app->last_committed_decree() + 1 == d) { |
| err = _app->apply_mutation(mu); |
| } else { |
| LOG_DEBUG_PREFIX("mutation {} commit to {} skipped, app.last_committed_decree = {}", |
| mu->name(), |
| enum_to_string(status()), |
| _app->last_committed_decree()); |
| } |
| break; |
| case partition_status::PS_PRIMARY: { |
| ADD_POINT(mu->_tracer); |
| check_state_completeness(); |
| CHECK_EQ(_app->last_committed_decree() + 1, d); |
| err = _app->apply_mutation(mu); |
| } break; |
| |
| case partition_status::PS_SECONDARY: |
| if (!_secondary_states.checkpoint_is_running) { |
| check_state_completeness(); |
| CHECK_EQ(_app->last_committed_decree() + 1, d); |
| err = _app->apply_mutation(mu); |
| } else { |
| LOG_DEBUG_PREFIX("mutation {} commit to {} skipped, app.last_committed_decree = {}", |
| mu->name(), |
| enum_to_string(status()), |
| _app->last_committed_decree()); |
| |
| // make sure private log saves the state |
| // catch-up will be done later after checkpoint task is fininished |
| CHECK_NOTNULL(_private_log, ""); |
| } |
| break; |
| case partition_status::PS_POTENTIAL_SECONDARY: |
| if (_potential_secondary_states.learning_status == learner_status::LearningSucceeded || |
| _potential_secondary_states.learning_status == |
| learner_status::LearningWithPrepareTransient) { |
| CHECK_EQ(_app->last_committed_decree() + 1, d); |
| err = _app->apply_mutation(mu); |
| } else { |
| LOG_DEBUG_PREFIX("mutation {} commit to {} skipped, app.last_committed_decree = {}", |
| mu->name(), |
| enum_to_string(status()), |
| _app->last_committed_decree()); |
| |
| // prepare also happens with learner_status::LearningWithPrepare, in this case |
| // make sure private log saves the state, |
| // catch-up will be done later after the checkpoint task is finished |
| CHECK_NOTNULL(_private_log, ""); |
| } |
| break; |
| case partition_status::PS_PARTITION_SPLIT: |
| if (_split_states.is_caught_up) { |
| CHECK_EQ(_app->last_committed_decree() + 1, d); |
| err = _app->apply_mutation(mu); |
| } |
| break; |
| case partition_status::PS_ERROR: |
| break; |
| default: |
| CHECK(false, "invalid partition_status, status = {}", enum_to_string(status())); |
| } |
| |
| LOG_DEBUG_PREFIX("TwoPhaseCommit, mutation {} committed, err = {}", mu->name(), err); |
| |
| if (err != ERR_OK) { |
| handle_local_failure(err); |
| } |
| |
| if (status() != partition_status::PS_PRIMARY) { |
| return; |
| } |
| |
| ADD_CUSTOM_POINT(mu->_tracer, "completed"); |
| auto next = _primary_states.write_queue.check_possible_work( |
| static_cast<int>(_prepare_list->max_decree() - d)); |
| |
| if (next != nullptr) { |
| init_prepare(next, false); |
| } |
| } |
| |
| mutation_ptr replica::new_mutation(decree decree) |
| { |
| mutation_ptr mu(new mutation()); |
| mu->data.header.pid = get_gpid(); |
| mu->data.header.ballot = get_ballot(); |
| mu->data.header.decree = decree; |
| mu->data.header.log_offset = invalid_offset; |
| return mu; |
| } |
| |
| decree replica::last_durable_decree() const { return _app->last_durable_decree(); } |
| |
| decree replica::last_prepared_decree() const |
| { |
| ballot lastBallot = 0; |
| decree start = last_committed_decree(); |
| while (true) { |
| auto mu = _prepare_list->get_mutation_by_decree(start + 1); |
| if (mu == nullptr || mu->data.header.ballot < lastBallot || !mu->is_logged()) |
| break; |
| |
| start++; |
| lastBallot = mu->data.header.ballot; |
| } |
| return start; |
| } |
| |
| bool replica::verbose_commit_log() const { return _stub->_verbose_commit_log; } |
| |
| void replica::close() |
| { |
| CHECK_PREFIX_MSG(status() == partition_status::PS_ERROR || |
| status() == partition_status::PS_INACTIVE || |
| _disk_migrator->status() == disk_migration_status::IDLE || |
| _disk_migrator->status() >= disk_migration_status::MOVED, |
| "invalid state(partition_status={}, migration_status={}) when calling " |
| "replica close", |
| enum_to_string(status()), |
| enum_to_string(_disk_migrator->status())); |
| |
| uint64_t start_time = dsn_now_ms(); |
| |
| if (_checkpoint_timer != nullptr) { |
| _checkpoint_timer->cancel(true); |
| _checkpoint_timer = nullptr; |
| } |
| |
| _tracker.cancel_outstanding_tasks(); |
| |
| cleanup_preparing_mutations(true); |
| CHECK(_primary_states.is_cleaned(), "primary context is not cleared"); |
| |
| if (partition_status::PS_INACTIVE == status()) { |
| CHECK(_secondary_states.is_cleaned(), "secondary context is not cleared"); |
| CHECK(_potential_secondary_states.is_cleaned(), |
| "potential secondary context is not cleared"); |
| CHECK(_split_states.is_cleaned(), "partition split context is not cleared"); |
| } |
| |
| // for partition_status::PS_ERROR, context cleanup is done here as they may block |
| else { |
| CHECK_PREFIX_MSG(_secondary_states.cleanup(true), "secondary context is not cleared"); |
| CHECK_PREFIX_MSG(_potential_secondary_states.cleanup(true), |
| "potential secondary context is not cleared"); |
| CHECK_PREFIX_MSG(_split_states.cleanup(true), "partition split context is not cleared"); |
| } |
| |
| if (_private_log != nullptr) { |
| _private_log->close(); |
| _private_log = nullptr; |
| } |
| |
| if (_app != nullptr) { |
| std::unique_ptr<replication_app_base> tmp_app = std::move(_app); |
| error_code err = tmp_app->close(false); |
| if (err != dsn::ERR_OK) { |
| LOG_WARNING_PREFIX("close app failed, err = {}", err); |
| } |
| } |
| |
| if (_disk_migrator->status() == disk_migration_status::MOVED) { |
| // this will update disk_migration_status::MOVED->disk_migration_status::CLOSED |
| _disk_migrator->update_replica_dir(); |
| } else if (_disk_migrator->status() == disk_migration_status::CLOSED) { |
| _disk_migrator.reset(); |
| } |
| |
| // duplication_impl may have ongoing tasks. |
| // release it before release replica. |
| _duplication_mgr.reset(); |
| |
| _backup_mgr.reset(); |
| |
| _bulk_loader.reset(); |
| |
| _split_mgr.reset(); |
| |
| LOG_INFO_PREFIX("replica closed, time_used = {} ms", dsn_now_ms() - start_time); |
| } |
| |
| std::string replica::query_manual_compact_state() const |
| { |
| CHECK_PREFIX(_app); |
| return _app->query_compact_state(); |
| } |
| |
| manual_compaction_status::type replica::get_manual_compact_status() const |
| { |
| CHECK_PREFIX(_app); |
| return _app->query_compact_status(); |
| } |
| |
| void replica::on_detect_hotkey(const detect_hotkey_request &req, detect_hotkey_response &resp) |
| { |
| _app->on_detect_hotkey(req, resp); |
| } |
| |
| uint32_t replica::query_data_version() const |
| { |
| CHECK_PREFIX(_app); |
| return _app->query_data_version(); |
| } |
| |
| error_code replica::store_app_info(app_info &info, const std::string &path) |
| { |
| replica_app_info new_info((app_info *)&info); |
| const auto &info_path = path.empty() ? utils::filesystem::path_combine(_dir, kAppInfo) : path; |
| auto err = new_info.store(info_path); |
| if (dsn_unlikely(err != ERR_OK)) { |
| LOG_ERROR_PREFIX("failed to save app_info to {}, error = {}", info_path, err); |
| } |
| return err; |
| } |
| |
| bool replica::access_controller_allowed(message_ex *msg, const ranger::access_type &ac_type) const |
| { |
| return !_access_controller->is_enable_ranger_acl() || _access_controller->allowed(msg, ac_type); |
| } |
| |
| int64_t replica::get_backup_request_count() const { return METRIC_VAR_VALUE(backup_requests); } |
| |
| void replica::METRIC_FUNC_NAME_SET(dup_pending_mutations)() |
| { |
| METRIC_SET(*_duplication_mgr, dup_pending_mutations); |
| } |
| |
| } // namespace replication |
| } // namespace dsn |