| /* |
| * The MIT License (MIT) |
| * |
| * Copyright (c) 2015 Microsoft Corporation |
| * |
| * -=- Robust Distributed System Nucleus (rDSN) -=- |
| * |
| * Permission is hereby granted, free of charge, to any person obtaining a copy |
| * of this software and associated documentation files (the "Software"), to deal |
| * in the Software without restriction, including without limitation the rights |
| * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
| * copies of the Software, and to permit persons to whom the Software is |
| * furnished to do so, subject to the following conditions: |
| * |
| * The above copyright notice and this permission notice shall be included in |
| * all copies or substantial portions of the Software. |
| * |
| * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
| * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
| * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
| * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
| * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN |
| * THE SOFTWARE. |
| */ |
| |
| /* |
| * Description: |
| * replica container - replica stub |
| * |
| * Revision history: |
| * Mar., 2015, @imzhenyu (Zhenyu Guo), first version |
| * xxxx-xx-xx, author, fix bug about xxx |
| */ |
| |
| #include <boost/algorithm/string/replace.hpp> |
| // IWYU pragma: no_include <ext/alloc_traits.h> |
| #include <fmt/core.h> |
| #include <fmt/ostream.h> |
| #include <stdio.h> |
| #include <stdlib.h> |
| #include <string.h> |
| #include <algorithm> |
| #include <chrono> |
| #include <cstdint> |
| #include <deque> |
| #include <mutex> |
| #include <ostream> |
| #include <set> |
| #include <vector> |
| |
| #include "backup/replica_backup_server.h" |
| #include "bulk_load/replica_bulk_loader.h" |
| #include "common/backup_common.h" |
| #include "common/duplication_common.h" |
| #include "common/replication.codes.h" |
| #include "common/replication_enums.h" |
| #include "disk_cleaner.h" |
| #include "duplication/duplication_sync_timer.h" |
| #include "meta_admin_types.h" |
| #include "mutation.h" |
| #include "mutation_log.h" |
| #include "nfs/nfs_node.h" |
| #include "nfs_types.h" |
| #include "perf_counter/perf_counter.h" |
| #include "replica.h" |
| #include "replica/duplication/replica_follower.h" |
| #include "replica/log_file.h" |
| #include "replica/replica_context.h" |
| #include "replica/replica_stub.h" |
| #include "replica/replication_app_base.h" |
| #include "replica_disk_migrator.h" |
| #include "replica_stub.h" |
| #include "runtime/api_layer1.h" |
| #include "runtime/ranger/access_type.h" |
| #include "runtime/rpc/rpc_message.h" |
| #include "runtime/rpc/serialization.h" |
| #include "runtime/security/access_controller.h" |
| #include "runtime/task/async_calls.h" |
| #include "split/replica_split_manager.h" |
| #include "utils/command_manager.h" |
| #include "utils/filesystem.h" |
| #include "utils/fmt_logging.h" |
| #include "utils/ports.h" |
| #include "utils/process_utils.h" |
| #include "utils/rand.h" |
| #include "utils/string_conv.h" |
| #include "utils/string_view.h" |
| #include "utils/strings.h" |
| #include "utils/synchronize.h" |
| #ifdef DSN_ENABLE_GPERF |
| #include <gperftools/malloc_extension.h> |
| #elif defined(DSN_USE_JEMALLOC) |
| #include "utils/je_ctl.h" |
| #endif |
| #include "nfs/nfs_code_definition.h" |
| #include "remote_cmd/remote_command.h" |
| #include "utils/fail_point.h" |
| |
| namespace dsn { |
| namespace replication { |
| DSN_DEFINE_bool(replication, |
| deny_client_on_start, |
| false, |
| "whether to deny client read and write requests when starting the server"); |
| DSN_DEFINE_bool(replication, |
| verbose_client_log_on_start, |
| false, |
| "whether to print verbose error log when reply to client read and write requests " |
| "when starting the server"); |
| DSN_DEFINE_bool(replication, |
| mem_release_enabled, |
| true, |
| "whether to enable periodic memory release"); |
| DSN_DEFINE_bool(replication, |
| log_shared_force_flush, |
| false, |
| "when write shared log, whether to flush file after write done"); |
| DSN_DEFINE_bool(replication, gc_disabled, false, "whether to disable garbage collection"); |
| DSN_DEFINE_bool(replication, disk_stat_disabled, false, "whether to disable disk stat"); |
| DSN_DEFINE_bool(replication, |
| delay_for_fd_timeout_on_start, |
| false, |
| "whether to delay for beacon grace period to make failure detector timeout when " |
| "starting the server"); |
| DSN_DEFINE_bool(replication, |
| config_sync_disabled, |
| false, |
| "whether to disable replica configuration periodical sync with the meta server"); |
| DSN_DEFINE_bool(replication, fd_disabled, false, "whether to disable failure detection"); |
| DSN_DEFINE_bool(replication, |
| verbose_commit_log_on_start, |
| false, |
| "whether to print verbose log when commit mutation when starting the server"); |
| DSN_DEFINE_uint32(replication, |
| max_concurrent_manual_emergency_checkpointing_count, |
| 10, |
| "max concurrent manual emergency checkpoint running count"); |
| DSN_TAG_VARIABLE(max_concurrent_manual_emergency_checkpointing_count, FT_MUTABLE); |
| |
| DSN_DEFINE_uint32( |
| replication, |
| config_sync_interval_ms, |
| 30000, |
| "The interval milliseconds of replica server to syncs replica configuration with meta server"); |
| DSN_TAG_VARIABLE(config_sync_interval_ms, FT_MUTABLE); |
| DSN_DEFINE_validator(config_sync_interval_ms, [](uint32_t value) -> bool { return value > 0; }); |
| |
| DSN_DEFINE_int32(replication, |
| disk_stat_interval_seconds, |
| 600, |
| "every what period (ms) we do disk stat"); |
| DSN_DEFINE_int32(replication, |
| gc_memory_replica_interval_ms, |
| 10 * 60 * 1000, |
| "after closing a healthy replica (due to LB), the replica will remain in memory " |
| "for this long (ms) for quick recover"); |
| DSN_DEFINE_int32(replication, |
| log_shared_file_size_mb, |
| 32, |
| "shared log maximum segment file size (MB)"); |
| |
| DSN_DEFINE_int32(replication, log_shared_file_count_limit, 100, "shared log maximum file count"); |
| DSN_DEFINE_int32( |
| replication, |
| mem_release_check_interval_ms, |
| 3600000, |
| "the replica check if should release memory to the system every this period of time(ms)"); |
| DSN_DEFINE_int32( |
| replication, |
| mem_release_max_reserved_mem_percentage, |
| 10, |
| "if tcmalloc reserved but not-used memory exceed this percentage of application allocated " |
| "memory, replica server will release the exceeding memory back to operating system"); |
| |
| DSN_DECLARE_bool(duplication_enabled); |
| DSN_DECLARE_int32(fd_beacon_interval_seconds); |
| DSN_DECLARE_int32(fd_check_interval_seconds); |
| DSN_DECLARE_int32(fd_grace_seconds); |
| DSN_DECLARE_int32(fd_lease_seconds); |
| DSN_DECLARE_int32(gc_interval_ms); |
| |
| bool replica_stub::s_not_exit_on_log_failure = false; |
| |
| replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/, |
| bool is_long_subscriber /* = true*/) |
| : serverlet("replica_stub"), |
| _deny_client(false), |
| _verbose_client_log(false), |
| _verbose_commit_log(false), |
| _release_tcmalloc_memory(false), |
| _mem_release_max_reserved_mem_percentage(10), |
| _max_concurrent_bulk_load_downloading_count(5), |
| _learn_app_concurrent_count(0), |
| _bulk_load_downloading_count(0), |
| _manual_emergency_checkpointing_count(0), |
| _is_running(false) |
| { |
| #ifdef DSN_ENABLE_GPERF |
| _is_releasing_memory = false; |
| #endif |
| _replica_state_subscriber = subscriber; |
| _is_long_subscriber = is_long_subscriber; |
| _failure_detector = nullptr; |
| _state = NS_Disconnected; |
| _log = nullptr; |
| _primary_address_str[0] = '\0'; |
| install_perf_counters(); |
| } |
| |
| replica_stub::~replica_stub(void) { close(); } |
| |
| void replica_stub::install_perf_counters() |
| { |
| _counter_replicas_count.init_app_counter( |
| "eon.replica_stub", "replica(Count)", COUNTER_TYPE_NUMBER, "# in replica_stub._replicas"); |
| _counter_replicas_opening_count.init_app_counter("eon.replica_stub", |
| "opening.replica(Count)", |
| COUNTER_TYPE_NUMBER, |
| "# in replica_stub._opening_replicas"); |
| _counter_replicas_closing_count.init_app_counter("eon.replica_stub", |
| "closing.replica(Count)", |
| COUNTER_TYPE_NUMBER, |
| "# in replica_stub._closing_replicas"); |
| _counter_replicas_commit_qps.init_app_counter("eon.replica_stub", |
| "replicas.commit.qps", |
| COUNTER_TYPE_RATE, |
| "server-level commit throughput"); |
| _counter_replicas_learning_count.init_app_counter("eon.replica_stub", |
| "replicas.learning.count", |
| COUNTER_TYPE_NUMBER, |
| "current learning count"); |
| _counter_replicas_learning_max_duration_time_ms.init_app_counter( |
| "eon.replica_stub", |
| "replicas.learning.max.duration.time(ms)", |
| COUNTER_TYPE_NUMBER, |
| "current learning max duration time(ms)"); |
| _counter_replicas_learning_max_copy_file_size.init_app_counter( |
| "eon.replica_stub", |
| "replicas.learning.max.copy.file.size", |
| COUNTER_TYPE_NUMBER, |
| "current learning max copy file size"); |
| _counter_replicas_learning_recent_start_count.init_app_counter( |
| "eon.replica_stub", |
| "replicas.learning.recent.start.count", |
| COUNTER_TYPE_VOLATILE_NUMBER, |
| "current learning start count in the recent period"); |
| _counter_replicas_learning_recent_round_start_count.init_app_counter( |
| "eon.replica_stub", |
| "replicas.learning.recent.round.start.count", |
| COUNTER_TYPE_VOLATILE_NUMBER, |
| "learning round start count in the recent period"); |
| _counter_replicas_learning_recent_copy_file_count.init_app_counter( |
| "eon.replica_stub", |
| "replicas.learning.recent.copy.file.count", |
| COUNTER_TYPE_VOLATILE_NUMBER, |
| "learning copy file count in the recent period"); |
| _counter_replicas_learning_recent_copy_file_size.init_app_counter( |
| "eon.replica_stub", |
| "replicas.learning.recent.copy.file.size", |
| COUNTER_TYPE_VOLATILE_NUMBER, |
| "learning copy file size in the recent period"); |
| _counter_replicas_learning_recent_copy_buffer_size.init_app_counter( |
| "eon.replica_stub", |
| "replicas.learning.recent.copy.buffer.size", |
| COUNTER_TYPE_VOLATILE_NUMBER, |
| "learning copy buffer size in the recent period"); |
| _counter_replicas_learning_recent_learn_cache_count.init_app_counter( |
| "eon.replica_stub", |
| "replicas.learning.recent.learn.cache.count", |
| COUNTER_TYPE_VOLATILE_NUMBER, |
| "learning LT_CACHE count in the recent period"); |
| _counter_replicas_learning_recent_learn_app_count.init_app_counter( |
| "eon.replica_stub", |
| "replicas.learning.recent.learn.app.count", |
| COUNTER_TYPE_VOLATILE_NUMBER, |
| "learning LT_APP count in the recent period"); |
| _counter_replicas_learning_recent_learn_log_count.init_app_counter( |
| "eon.replica_stub", |
| "replicas.learning.recent.learn.log.count", |
| COUNTER_TYPE_VOLATILE_NUMBER, |
| "learning LT_LOG count in the recent period"); |
| _counter_replicas_learning_recent_learn_reset_count.init_app_counter( |
| "eon.replica_stub", |
| "replicas.learning.recent.learn.reset.count", |
| COUNTER_TYPE_VOLATILE_NUMBER, |
| "learning reset count in the recent period" |
| "for the reason of resp.last_committed_decree < _app->last_committed_decree()"); |
| _counter_replicas_learning_recent_learn_fail_count.init_app_counter( |
| "eon.replica_stub", |
| "replicas.learning.recent.learn.fail.count", |
| COUNTER_TYPE_VOLATILE_NUMBER, |
| "learning fail count in the recent period"); |
| _counter_replicas_learning_recent_learn_succ_count.init_app_counter( |
| "eon.replica_stub", |
| "replicas.learning.recent.learn.succ.count", |
| COUNTER_TYPE_VOLATILE_NUMBER, |
| "learning succeed count in the recent period"); |
| |
| _counter_replicas_recent_prepare_fail_count.init_app_counter( |
| "eon.replica_stub", |
| "replicas.recent.prepare.fail.count", |
| COUNTER_TYPE_VOLATILE_NUMBER, |
| "prepare fail count in the recent period"); |
| _counter_replicas_recent_replica_move_error_count.init_app_counter( |
| "eon.replica_stub", |
| "replicas.recent.replica.move.error.count", |
| COUNTER_TYPE_VOLATILE_NUMBER, |
| "replica move to error count in the recent period"); |
| _counter_replicas_recent_replica_move_garbage_count.init_app_counter( |
| "eon.replica_stub", |
| "replicas.recent.replica.move.garbage.count", |
| COUNTER_TYPE_VOLATILE_NUMBER, |
| "replica move to garbage count in the recent period"); |
| _counter_replicas_recent_replica_remove_dir_count.init_app_counter( |
| "eon.replica_stub", |
| "replicas.recent.replica.remove.dir.count", |
| COUNTER_TYPE_VOLATILE_NUMBER, |
| "replica directory remove count in the recent period"); |
| _counter_replicas_error_replica_dir_count.init_app_counter( |
| "eon.replica_stub", |
| "replicas.error.replica.dir.count", |
| COUNTER_TYPE_NUMBER, |
| "error replica directory(*.err) count"); |
| _counter_replicas_garbage_replica_dir_count.init_app_counter( |
| "eon.replica_stub", |
| "replicas.garbage.replica.dir.count", |
| COUNTER_TYPE_NUMBER, |
| "garbage replica directory(*.gar) count"); |
| _counter_replicas_tmp_replica_dir_count.init_app_counter( |
| "eon.replica_stub", |
| "replicas.tmp.replica.dir.count", |
| COUNTER_TYPE_NUMBER, |
| "disk migration tmp replica directory(*.tmp) count"); |
| _counter_replicas_origin_replica_dir_count.init_app_counter( |
| "eon.replica_stub", |
| "replicas.origin.replica.dir.count", |
| COUNTER_TYPE_NUMBER, |
| "disk migration origin replica directory(.ori) count"); |
| |
| _counter_replicas_recent_group_check_fail_count.init_app_counter( |
| "eon.replica_stub", |
| "replicas.recent.group.check.fail.count", |
| COUNTER_TYPE_VOLATILE_NUMBER, |
| "group check fail count in the recent period"); |
| |
| _counter_shared_log_size.init_app_counter( |
| "eon.replica_stub", "shared.log.size(MB)", COUNTER_TYPE_NUMBER, "shared log size(MB)"); |
| _counter_shared_log_recent_write_size.init_app_counter( |
| "eon.replica_stub", |
| "shared.log.recent.write.size", |
| COUNTER_TYPE_VOLATILE_NUMBER, |
| "shared log write size in the recent period"); |
| _counter_recent_trigger_emergency_checkpoint_count.init_app_counter( |
| "eon.replica_stub", |
| "recent.trigger.emergency.checkpoint.count", |
| COUNTER_TYPE_VOLATILE_NUMBER, |
| "trigger emergency checkpoint count in the recent period"); |
| |
| // <- Duplication Metrics -> |
| |
| _counter_dup_confirmed_rate.init_app_counter("eon.replica_stub", |
| "dup.confirmed_rate", |
| COUNTER_TYPE_RATE, |
| "increasing rate of confirmed mutations"); |
| _counter_dup_pending_mutations_count.init_app_counter( |
| "eon.replica_stub", |
| "dup.pending_mutations_count", |
| COUNTER_TYPE_VOLATILE_NUMBER, |
| "number of mutations pending for duplication"); |
| |
| // <- Cold Backup Metrics -> |
| |
| _counter_cold_backup_running_count.init_app_counter("eon.replica_stub", |
| "cold.backup.running.count", |
| COUNTER_TYPE_NUMBER, |
| "current cold backup count"); |
| _counter_cold_backup_recent_start_count.init_app_counter( |
| "eon.replica_stub", |
| "cold.backup.recent.start.count", |
| COUNTER_TYPE_VOLATILE_NUMBER, |
| "current cold backup start count in the recent period"); |
| _counter_cold_backup_recent_succ_count.init_app_counter( |
| "eon.replica_stub", |
| "cold.backup.recent.succ.count", |
| COUNTER_TYPE_VOLATILE_NUMBER, |
| "current cold backup succeed count in the recent period"); |
| _counter_cold_backup_recent_fail_count.init_app_counter( |
| "eon.replica_stub", |
| "cold.backup.recent.fail.count", |
| COUNTER_TYPE_VOLATILE_NUMBER, |
| "current cold backup fail count in the recent period"); |
| _counter_cold_backup_recent_cancel_count.init_app_counter( |
| "eon.replica_stub", |
| "cold.backup.recent.cancel.count", |
| COUNTER_TYPE_VOLATILE_NUMBER, |
| "current cold backup cancel count in the recent period"); |
| _counter_cold_backup_recent_pause_count.init_app_counter( |
| "eon.replica_stub", |
| "cold.backup.recent.pause.count", |
| COUNTER_TYPE_VOLATILE_NUMBER, |
| "current cold backup pause count in the recent period"); |
| _counter_cold_backup_recent_upload_file_succ_count.init_app_counter( |
| "eon.replica_stub", |
| "cold.backup.recent.upload.file.succ.count", |
| COUNTER_TYPE_VOLATILE_NUMBER, |
| "current cold backup upload file succeed count in the recent period"); |
| _counter_cold_backup_recent_upload_file_fail_count.init_app_counter( |
| "eon.replica_stub", |
| "cold.backup.recent.upload.file.fail.count", |
| COUNTER_TYPE_VOLATILE_NUMBER, |
| "current cold backup upload file failed count in the recent period"); |
| _counter_cold_backup_recent_upload_file_size.init_app_counter( |
| "eon.replica_stub", |
| "cold.backup.recent.upload.file.size", |
| COUNTER_TYPE_VOLATILE_NUMBER, |
| "current cold backup upload file size in the recent perriod"); |
| _counter_cold_backup_max_duration_time_ms.init_app_counter( |
| "eon.replica_stub", |
| "cold.backup.max.duration.time.ms", |
| COUNTER_TYPE_NUMBER, |
| "current cold backup max duration time"); |
| _counter_cold_backup_max_upload_file_size.init_app_counter( |
| "eon.replica_stub", |
| "cold.backup.max.upload.file.size", |
| COUNTER_TYPE_NUMBER, |
| "current cold backup max upload file size"); |
| |
| _counter_recent_read_fail_count.init_app_counter("eon.replica_stub", |
| "recent.read.fail.count", |
| COUNTER_TYPE_VOLATILE_NUMBER, |
| "read fail count in the recent period"); |
| _counter_recent_write_fail_count.init_app_counter("eon.replica_stub", |
| "recent.write.fail.count", |
| COUNTER_TYPE_VOLATILE_NUMBER, |
| "write fail count in the recent period"); |
| _counter_recent_read_busy_count.init_app_counter("eon.replica_stub", |
| "recent.read.busy.count", |
| COUNTER_TYPE_VOLATILE_NUMBER, |
| "read busy count in the recent period"); |
| _counter_recent_write_busy_count.init_app_counter("eon.replica_stub", |
| "recent.write.busy.count", |
| COUNTER_TYPE_VOLATILE_NUMBER, |
| "write busy count in the recent period"); |
| |
| _counter_recent_write_size_exceed_threshold_count.init_app_counter( |
| "eon.replica_stub", |
| "recent_write_size_exceed_threshold_count", |
| COUNTER_TYPE_VOLATILE_NUMBER, |
| "write size exceed threshold count in the recent period"); |
| |
| // <- Bulk Load Metrics -> |
| |
| _counter_bulk_load_running_count.init_app_counter("eon.replica_stub", |
| "bulk.load.running.count", |
| COUNTER_TYPE_VOLATILE_NUMBER, |
| "current bulk load running count"); |
| _counter_bulk_load_downloading_count.init_app_counter("eon.replica_stub", |
| "bulk.load.downloading.count", |
| COUNTER_TYPE_VOLATILE_NUMBER, |
| "current bulk load downloading count"); |
| _counter_bulk_load_ingestion_count.init_app_counter("eon.replica_stub", |
| "bulk.load.ingestion.count", |
| COUNTER_TYPE_VOLATILE_NUMBER, |
| "current bulk load ingestion count"); |
| _counter_bulk_load_succeed_count.init_app_counter("eon.replica_stub", |
| "bulk.load.succeed.count", |
| COUNTER_TYPE_VOLATILE_NUMBER, |
| "current bulk load succeed count"); |
| _counter_bulk_load_failed_count.init_app_counter("eon.replica_stub", |
| "bulk.load.failed.count", |
| COUNTER_TYPE_VOLATILE_NUMBER, |
| "current bulk load failed count"); |
| _counter_bulk_load_download_file_succ_count.init_app_counter( |
| "eon.replica_stub", |
| "bulk.load.download.file.success.count", |
| COUNTER_TYPE_VOLATILE_NUMBER, |
| "bulk load recent download file success count"); |
| _counter_bulk_load_download_file_fail_count.init_app_counter( |
| "eon.replica_stub", |
| "bulk.load.download.file.fail.count", |
| COUNTER_TYPE_VOLATILE_NUMBER, |
| "bulk load recent download file failed count"); |
| _counter_bulk_load_download_file_size.init_app_counter("eon.replica_stub", |
| "bulk.load.download.file.size", |
| COUNTER_TYPE_VOLATILE_NUMBER, |
| "bulk load recent download file size"); |
| _counter_bulk_load_max_ingestion_time_ms.init_app_counter( |
| "eon.replica_stub", |
| "bulk.load.max.ingestion.duration.time.ms", |
| COUNTER_TYPE_NUMBER, |
| "bulk load max ingestion duration time(ms)"); |
| _counter_bulk_load_max_duration_time_ms.init_app_counter("eon.replica_stub", |
| "bulk.load.max.duration.time.ms", |
| COUNTER_TYPE_NUMBER, |
| "bulk load max duration time(ms)"); |
| |
| #ifdef DSN_ENABLE_GPERF |
| _counter_tcmalloc_release_memory_size.init_app_counter("eon.replica_stub", |
| "tcmalloc.release.memory.size", |
| COUNTER_TYPE_NUMBER, |
| "current tcmalloc release memory size"); |
| #endif |
| |
| // <- Partition split Metrics -> |
| |
| _counter_replicas_splitting_count.init_app_counter("eon.replica_stub", |
| "replicas.splitting.count", |
| COUNTER_TYPE_NUMBER, |
| "current partition splitting count"); |
| |
| _counter_replicas_splitting_max_duration_time_ms.init_app_counter( |
| "eon.replica_stub", |
| "replicas.splitting.max.duration.time(ms)", |
| COUNTER_TYPE_NUMBER, |
| "current partition splitting max duration time(ms)"); |
| _counter_replicas_splitting_max_async_learn_time_ms.init_app_counter( |
| "eon.replica_stub", |
| "replicas.splitting.max.async.learn.time(ms)", |
| COUNTER_TYPE_NUMBER, |
| "current partition splitting max async learn time(ms)"); |
| _counter_replicas_splitting_max_copy_file_size.init_app_counter( |
| "eon.replica_stub", |
| "replicas.splitting.max.copy.file.size", |
| COUNTER_TYPE_NUMBER, |
| "current splitting max copy file size"); |
| _counter_replicas_splitting_recent_start_count.init_app_counter( |
| "eon.replica_stub", |
| "replicas.splitting.recent.start.count", |
| COUNTER_TYPE_VOLATILE_NUMBER, |
| "current splitting start count in the recent period"); |
| _counter_replicas_splitting_recent_copy_file_count.init_app_counter( |
| "eon.replica_stub", |
| "replicas.splitting.recent.copy.file.count", |
| COUNTER_TYPE_VOLATILE_NUMBER, |
| "splitting copy file count in the recent period"); |
| _counter_replicas_splitting_recent_copy_file_size.init_app_counter( |
| "eon.replica_stub", |
| "replicas.splitting.recent.copy.file.size", |
| COUNTER_TYPE_VOLATILE_NUMBER, |
| "splitting copy file size in the recent period"); |
| _counter_replicas_splitting_recent_copy_mutation_count.init_app_counter( |
| "eon.replica_stub", |
| "replicas.splitting.recent.copy.mutation.count", |
| COUNTER_TYPE_VOLATILE_NUMBER, |
| "splitting copy mutation count in the recent period"); |
| _counter_replicas_splitting_recent_split_succ_count.init_app_counter( |
| "eon.replica_stub", |
| "replicas.splitting.recent.split.succ.count", |
| COUNTER_TYPE_VOLATILE_NUMBER, |
| "splitting succeed count in the recent period"); |
| _counter_replicas_splitting_recent_split_fail_count.init_app_counter( |
| "eon.replica_stub", |
| "replicas.splitting.recent.split.fail.count", |
| COUNTER_TYPE_VOLATILE_NUMBER, |
| "splitting fail count in the recent period"); |
| } |
| |
| void replica_stub::initialize(bool clear /* = false*/) |
| { |
| replication_options opts; |
| opts.initialize(); |
| initialize(opts, clear); |
| _access_controller = std::make_unique<dsn::security::access_controller>(); |
| } |
| |
| void replica_stub::initialize(const replication_options &opts, bool clear /* = false*/) |
| { |
| _primary_address = dsn_primary_address(); |
| strcpy(_primary_address_str, _primary_address.to_string()); |
| LOG_INFO("primary_address = {}", _primary_address_str); |
| |
| set_options(opts); |
| std::ostringstream oss; |
| for (int i = 0; i < _options.meta_servers.size(); ++i) { |
| if (i != 0) |
| oss << ","; |
| oss << _options.meta_servers[i].to_string(); |
| } |
| LOG_INFO("meta_servers = {}", oss.str()); |
| |
| _deny_client = FLAGS_deny_client_on_start; |
| _verbose_client_log = FLAGS_verbose_client_log_on_start; |
| _verbose_commit_log = FLAGS_verbose_commit_log_on_start; |
| _release_tcmalloc_memory = FLAGS_mem_release_enabled; |
| _mem_release_max_reserved_mem_percentage = FLAGS_mem_release_max_reserved_mem_percentage; |
| _max_concurrent_bulk_load_downloading_count = |
| _options.max_concurrent_bulk_load_downloading_count; |
| |
| // clear dirs if need |
| if (clear) { |
| CHECK(dsn::utils::filesystem::remove_path(_options.slog_dir), |
| "Fail to remove {}.", |
| _options.slog_dir); |
| for (auto &dir : _options.data_dirs) { |
| CHECK(dsn::utils::filesystem::remove_path(dir), "Fail to remove {}.", dir); |
| } |
| } |
| |
| // Initialize the file system manager. |
| _fs_manager.initialize(_options.data_dirs, _options.data_dir_tags); |
| |
| // TODO(yingchun): remove the slog related code. |
| // Create slog directory if it does not exist. |
| std::string cdir; |
| std::string err_msg; |
| CHECK(utils::filesystem::create_directory(_options.slog_dir, cdir, err_msg), err_msg); |
| _options.slog_dir = cdir; |
| |
| // Initialize slog. |
| _log = new mutation_log_shared(_options.slog_dir, |
| FLAGS_log_shared_file_size_mb, |
| FLAGS_log_shared_force_flush, |
| &_counter_shared_log_recent_write_size); |
| LOG_INFO("slog_dir = {}", _options.slog_dir); |
| |
| // Start to load replicas in available data directories. |
| LOG_INFO("start to load replicas"); |
| |
| std::vector<std::string> dir_list; |
| for (const auto &dn : _fs_manager.get_dir_nodes()) { |
| std::vector<std::string> tmp_list; |
| CHECK(dsn::utils::filesystem::get_subdirectories(dn->full_dir, tmp_list, false), |
| "Fail to get subdirectories in {}.", |
| dn->full_dir); |
| dir_list.insert(dir_list.end(), tmp_list.begin(), tmp_list.end()); |
| } |
| |
| replicas rps; |
| utils::ex_lock rps_lock; |
| std::deque<task_ptr> load_tasks; |
| uint64_t start_time = dsn_now_ms(); |
| for (auto &dir : dir_list) { |
| if (dsn::replication::is_data_dir_invalid(dir)) { |
| LOG_INFO("ignore dir {}", dir); |
| continue; |
| } |
| |
| load_tasks.push_back( |
| tasking::create_task(LPC_REPLICATION_INIT_LOAD, |
| &_tracker, |
| [this, dir, &rps, &rps_lock] { |
| LOG_INFO("process dir {}", dir); |
| |
| auto r = load_replica(dir.c_str()); |
| if (r != nullptr) { |
| LOG_INFO("{}@{}: load replica '{}' success, <durable, " |
| "commit> = <{}, {}>, last_prepared_decree = {}", |
| r->get_gpid(), |
| dsn_primary_address(), |
| dir, |
| r->last_durable_decree(), |
| r->last_committed_decree(), |
| r->last_prepared_decree()); |
| |
| utils::auto_lock<utils::ex_lock> l(rps_lock); |
| CHECK(rps.find(r->get_gpid()) == rps.end(), |
| "conflict replica dir: {} <--> {}", |
| r->dir(), |
| rps[r->get_gpid()]->dir()); |
| |
| rps[r->get_gpid()] = r; |
| } |
| }, |
| load_tasks.size())); |
| load_tasks.back()->enqueue(); |
| } |
| for (auto &tsk : load_tasks) { |
| tsk->wait(); |
| } |
| uint64_t finish_time = dsn_now_ms(); |
| |
| dir_list.clear(); |
| load_tasks.clear(); |
| LOG_INFO("load replicas succeed, replica_count = {}, time_used = {} ms", |
| rps.size(), |
| finish_time - start_time); |
| |
| // init shared prepare log |
| LOG_INFO("start to replay shared log"); |
| |
| std::map<gpid, decree> replay_condition; |
| for (auto it = rps.begin(); it != rps.end(); ++it) { |
| replay_condition[it->first] = it->second->last_committed_decree(); |
| } |
| |
| start_time = dsn_now_ms(); |
| error_code err = _log->open( |
| [&rps](int log_length, mutation_ptr &mu) { |
| auto it = rps.find(mu->data.header.pid); |
| if (it != rps.end()) { |
| return it->second->replay_mutation(mu, false); |
| } else { |
| return false; |
| } |
| }, |
| [this](error_code err) { this->handle_log_failure(err); }, |
| replay_condition); |
| finish_time = dsn_now_ms(); |
| |
| if (err == ERR_OK) { |
| LOG_INFO("replay shared log succeed, time_used = {} ms", finish_time - start_time); |
| } else { |
| LOG_ERROR("replay shared log failed, err = {}, time_used = {} ms, clear all logs ...", |
| err, |
| finish_time - start_time); |
| |
| // we must delete or update meta server the error for all replicas |
| // before we fix the logs |
| // otherwise, the next process restart may consider the replicas' |
| // state complete |
| |
| // delete all replicas |
| // TODO: checkpoint latest state and update on meta server so learning is cheaper |
| for (auto it = rps.begin(); it != rps.end(); ++it) { |
| it->second->close(); |
| move_to_err_path(it->second->dir(), "initialize replica"); |
| _counter_replicas_recent_replica_move_error_count->increment(); |
| } |
| rps.clear(); |
| |
| // restart log service |
| _log->close(); |
| _log = nullptr; |
| CHECK(utils::filesystem::remove_path(_options.slog_dir), |
| "remove directory {} failed", |
| _options.slog_dir); |
| _log = new mutation_log_shared(_options.slog_dir, |
| FLAGS_log_shared_file_size_mb, |
| FLAGS_log_shared_force_flush, |
| &_counter_shared_log_recent_write_size); |
| CHECK_EQ_MSG(_log->open(nullptr, [this](error_code err) { this->handle_log_failure(err); }), |
| ERR_OK, |
| "restart log service failed"); |
| } |
| |
| bool is_log_complete = true; |
| for (auto it = rps.begin(); it != rps.end(); ++it) { |
| CHECK_EQ_MSG(it->second->background_sync_checkpoint(), ERR_OK, "sync checkpoint failed"); |
| |
| it->second->reset_prepare_list_after_replay(); |
| |
| decree pmax = invalid_decree; |
| decree pmax_commit = invalid_decree; |
| if (it->second->private_log()) { |
| pmax = it->second->private_log()->max_decree(it->first); |
| pmax_commit = it->second->private_log()->max_commit_on_disk(); |
| } |
| |
| LOG_INFO( |
| "{}: load replica done, err = {}, durable = {}, committed = {}, " |
| "prepared = {}, ballot = {}, " |
| "valid_offset_in_plog = {}, max_decree_in_plog = {}, max_commit_on_disk_in_plog = {}, " |
| "valid_offset_in_slog = {}", |
| it->second->name(), |
| err.to_string(), |
| it->second->last_durable_decree(), |
| it->second->last_committed_decree(), |
| it->second->max_prepared_decree(), |
| it->second->get_ballot(), |
| it->second->get_app()->init_info().init_offset_in_private_log, |
| pmax, |
| pmax_commit, |
| it->second->get_app()->init_info().init_offset_in_shared_log); |
| } |
| |
| // we will mark all replicas inactive not transient unless all logs are complete |
| if (!is_log_complete) { |
| LOG_ERROR("logs are not complete for some replicas, which means that shared log is " |
| "truncated, mark all replicas as inactive"); |
| for (auto it = rps.begin(); it != rps.end(); ++it) { |
| it->second->set_inactive_state_transient(false); |
| } |
| } |
| |
| // gc |
| if (!FLAGS_gc_disabled) { |
| _gc_timer_task = tasking::enqueue_timer( |
| LPC_GARBAGE_COLLECT_LOGS_AND_REPLICAS, |
| &_tracker, |
| [this] { on_gc(); }, |
| std::chrono::milliseconds(FLAGS_gc_interval_ms), |
| 0, |
| std::chrono::milliseconds(rand::next_u32(0, FLAGS_gc_interval_ms))); |
| } |
| |
| // disk stat |
| if (!FLAGS_disk_stat_disabled) { |
| _disk_stat_timer_task = |
| ::dsn::tasking::enqueue_timer(LPC_DISK_STAT, |
| &_tracker, |
| [this]() { on_disk_stat(); }, |
| std::chrono::seconds(FLAGS_disk_stat_interval_seconds), |
| 0, |
| std::chrono::seconds(FLAGS_disk_stat_interval_seconds)); |
| } |
| |
| // attach rps |
| _replicas = std::move(rps); |
| _counter_replicas_count->add((uint64_t)_replicas.size()); |
| for (const auto &kv : _replicas) { |
| _fs_manager.add_replica(kv.first, kv.second->dir()); |
| } |
| |
| _nfs = dsn::nfs_node::create(); |
| _nfs->start(); |
| |
| dist::cmd::register_remote_command_rpc(); |
| |
| if (FLAGS_delay_for_fd_timeout_on_start) { |
| uint64_t now_time_ms = dsn_now_ms(); |
| uint64_t delay_time_ms = |
| (FLAGS_fd_grace_seconds + 3) * 1000; // for more 3 seconds than grace seconds |
| if (now_time_ms < dsn::utils::process_start_millis() + delay_time_ms) { |
| uint64_t delay = dsn::utils::process_start_millis() + delay_time_ms - now_time_ms; |
| LOG_INFO("delay for {} ms to make failure detector timeout", delay); |
| tasking::enqueue(LPC_REPLICA_SERVER_DELAY_START, |
| &_tracker, |
| [this]() { this->initialize_start(); }, |
| 0, |
| std::chrono::milliseconds(delay)); |
| } else { |
| initialize_start(); |
| } |
| } else { |
| initialize_start(); |
| } |
| } |
| |
| void replica_stub::initialize_start() |
| { |
| if (_is_running) { |
| return; |
| } |
| |
| // start timer for configuration sync |
| if (!FLAGS_config_sync_disabled) { |
| _config_sync_timer_task = |
| tasking::enqueue_timer(LPC_QUERY_CONFIGURATION_ALL, |
| &_tracker, |
| [this]() { |
| zauto_lock l(_state_lock); |
| this->query_configuration_by_node(); |
| }, |
| std::chrono::milliseconds(FLAGS_config_sync_interval_ms), |
| 0, |
| std::chrono::milliseconds(FLAGS_config_sync_interval_ms)); |
| } |
| |
| #ifdef DSN_ENABLE_GPERF |
| _mem_release_timer_task = |
| tasking::enqueue_timer(LPC_MEM_RELEASE, |
| &_tracker, |
| std::bind(&replica_stub::gc_tcmalloc_memory, this, false), |
| std::chrono::milliseconds(FLAGS_mem_release_check_interval_ms), |
| 0, |
| std::chrono::milliseconds(FLAGS_mem_release_check_interval_ms)); |
| #endif |
| |
| if (FLAGS_duplication_enabled) { |
| _duplication_sync_timer = std::make_unique<duplication_sync_timer>(this); |
| _duplication_sync_timer->start(); |
| } |
| |
| _backup_server = std::make_unique<replica_backup_server>(this); |
| |
| // init liveness monitor |
| CHECK_EQ(NS_Disconnected, _state); |
| if (!FLAGS_fd_disabled) { |
| _failure_detector = std::make_shared<dsn::dist::slave_failure_detector_with_multimaster>( |
| _options.meta_servers, |
| [this]() { this->on_meta_server_disconnected(); }, |
| [this]() { this->on_meta_server_connected(); }); |
| |
| CHECK_EQ_MSG(_failure_detector->start(FLAGS_fd_check_interval_seconds, |
| FLAGS_fd_beacon_interval_seconds, |
| FLAGS_fd_lease_seconds, |
| FLAGS_fd_grace_seconds), |
| ERR_OK, |
| "FD start failed"); |
| |
| _failure_detector->register_master(_failure_detector->current_server_contact()); |
| } else { |
| _state = NS_Connected; |
| } |
| |
| _is_running = true; |
| } |
| |
| dsn::error_code replica_stub::on_kill_replica(gpid id) |
| { |
| LOG_INFO("kill replica: gpid = {}", id); |
| if (id.get_app_id() == -1 || id.get_partition_index() == -1) { |
| replicas rs; |
| { |
| zauto_read_lock l(_replicas_lock); |
| rs = _replicas; |
| } |
| for (auto it = rs.begin(); it != rs.end(); ++it) { |
| replica_ptr &r = it->second; |
| if (id.get_app_id() == -1 || id.get_app_id() == r->get_gpid().get_app_id()) |
| r->inject_error(ERR_INJECTED); |
| } |
| return ERR_OK; |
| } else { |
| error_code err = ERR_INVALID_PARAMETERS; |
| replica_ptr r = get_replica(id); |
| if (r == nullptr) { |
| err = ERR_OBJECT_NOT_FOUND; |
| } else { |
| r->inject_error(ERR_INJECTED); |
| err = ERR_OK; |
| } |
| return err; |
| } |
| } |
| |
| replica_ptr replica_stub::get_replica(gpid id) const |
| { |
| zauto_read_lock l(_replicas_lock); |
| auto it = _replicas.find(id); |
| if (it != _replicas.end()) |
| return it->second; |
| else |
| return nullptr; |
| } |
| |
| replica_stub::replica_life_cycle replica_stub::get_replica_life_cycle(gpid id) |
| { |
| zauto_read_lock l(_replicas_lock); |
| if (_opening_replicas.find(id) != _opening_replicas.end()) |
| return replica_stub::RL_creating; |
| if (_replicas.find(id) != _replicas.end()) |
| return replica_stub::RL_serving; |
| if (_closing_replicas.find(id) != _closing_replicas.end()) |
| return replica_stub::RL_closing; |
| if (_closed_replicas.find(id) != _closed_replicas.end()) |
| return replica_stub::RL_closed; |
| return replica_stub::RL_invalid; |
| } |
| |
| void replica_stub::on_client_write(gpid id, dsn::message_ex *request) |
| { |
| if (_deny_client) { |
| // ignore and do not reply |
| return; |
| } |
| if (_verbose_client_log && request) { |
| LOG_INFO("{}@{}: client = {}, code = {}, timeout = {}", |
| id, |
| _primary_address_str, |
| request->header->from_address, |
| request->header->rpc_name, |
| request->header->client.timeout_ms); |
| } |
| replica_ptr rep = get_replica(id); |
| if (rep != nullptr) { |
| rep->on_client_write(request); |
| } else { |
| response_client(id, false, request, partition_status::PS_INVALID, ERR_OBJECT_NOT_FOUND); |
| } |
| } |
| |
| void replica_stub::on_client_read(gpid id, dsn::message_ex *request) |
| { |
| if (_deny_client) { |
| // ignore and do not reply |
| return; |
| } |
| if (_verbose_client_log && request) { |
| LOG_INFO("{}@{}: client = {}, code = {}, timeout = {}", |
| id, |
| _primary_address_str, |
| request->header->from_address, |
| request->header->rpc_name, |
| request->header->client.timeout_ms); |
| } |
| replica_ptr rep = get_replica(id); |
| if (rep != nullptr) { |
| rep->on_client_read(request); |
| } else { |
| response_client(id, true, request, partition_status::PS_INVALID, ERR_OBJECT_NOT_FOUND); |
| } |
| } |
| |
| void replica_stub::on_config_proposal(const configuration_update_request &proposal) |
| { |
| if (!is_connected()) { |
| LOG_WARNING("{}@{}: received config proposal {} for {}: not connected, ignore", |
| proposal.config.pid, |
| _primary_address_str, |
| enum_to_string(proposal.type), |
| proposal.node); |
| return; |
| } |
| |
| LOG_INFO("{}@{}: received config proposal {} for {}", |
| proposal.config.pid, |
| _primary_address_str, |
| enum_to_string(proposal.type), |
| proposal.node); |
| |
| replica_ptr rep = get_replica(proposal.config.pid); |
| if (rep == nullptr) { |
| if (proposal.type == config_type::CT_ASSIGN_PRIMARY) { |
| std::shared_ptr<configuration_update_request> req2(new configuration_update_request); |
| *req2 = proposal; |
| begin_open_replica(proposal.info, proposal.config.pid, nullptr, req2); |
| } else if (proposal.type == config_type::CT_UPGRADE_TO_PRIMARY) { |
| remove_replica_on_meta_server(proposal.info, proposal.config); |
| } |
| } |
| |
| if (rep != nullptr) { |
| rep->on_config_proposal((configuration_update_request &)proposal); |
| } |
| } |
| |
| void replica_stub::on_query_decree(query_replica_decree_rpc rpc) |
| { |
| const query_replica_decree_request &req = rpc.request(); |
| query_replica_decree_response &resp = rpc.response(); |
| |
| replica_ptr rep = get_replica(req.pid); |
| if (rep != nullptr) { |
| resp.err = ERR_OK; |
| if (partition_status::PS_POTENTIAL_SECONDARY == rep->status()) { |
| resp.last_decree = 0; |
| } else { |
| resp.last_decree = rep->last_committed_decree(); |
| // TODO: use the following to alleviate data lost |
| // resp.last_decree = rep->last_prepared_decree(); |
| } |
| } else { |
| resp.err = ERR_OBJECT_NOT_FOUND; |
| resp.last_decree = 0; |
| } |
| } |
| |
| void replica_stub::on_query_replica_info(query_replica_info_rpc rpc) |
| { |
| query_replica_info_response &resp = rpc.response(); |
| std::set<gpid> visited_replicas; |
| { |
| zauto_read_lock l(_replicas_lock); |
| for (auto it = _replicas.begin(); it != _replicas.end(); ++it) { |
| replica_ptr &r = it->second; |
| replica_info info; |
| get_replica_info(info, r); |
| if (visited_replicas.find(info.pid) == visited_replicas.end()) { |
| visited_replicas.insert(info.pid); |
| resp.replicas.push_back(std::move(info)); |
| } |
| } |
| for (auto it = _closing_replicas.begin(); it != _closing_replicas.end(); ++it) { |
| const replica_info &info = std::get<3>(it->second); |
| if (visited_replicas.find(info.pid) == visited_replicas.end()) { |
| visited_replicas.insert(info.pid); |
| resp.replicas.push_back(info); |
| } |
| } |
| for (auto it = _closed_replicas.begin(); it != _closed_replicas.end(); ++it) { |
| const replica_info &info = it->second.second; |
| if (visited_replicas.find(info.pid) == visited_replicas.end()) { |
| visited_replicas.insert(info.pid); |
| resp.replicas.push_back(info); |
| } |
| } |
| } |
| resp.err = ERR_OK; |
| } |
| |
| void replica_stub::on_query_last_checkpoint(query_last_checkpoint_info_rpc rpc) |
| { |
| const learn_request &request = rpc.request(); |
| learn_response &response = rpc.response(); |
| |
| replica_ptr rep = get_replica(request.pid); |
| if (rep != nullptr) { |
| rep->on_query_last_checkpoint(response); |
| } else { |
| response.err = ERR_OBJECT_NOT_FOUND; |
| } |
| } |
| |
| // ThreadPool: THREAD_POOL_DEFAULT |
| void replica_stub::on_query_disk_info(query_disk_info_rpc rpc) |
| { |
| const query_disk_info_request &req = rpc.request(); |
| query_disk_info_response &resp = rpc.response(); |
| int app_id = 0; |
| if (!req.app_name.empty()) { |
| zauto_read_lock l(_replicas_lock); |
| app_id = get_app_id_from_replicas(req.app_name); |
| if (app_id == 0) { |
| resp.err = ERR_OBJECT_NOT_FOUND; |
| return; |
| } |
| } |
| |
| for (const auto &dir_node : _fs_manager._dir_nodes) { |
| disk_info info; |
| // app_name empty means query all app replica_count |
| if (req.app_name.empty()) { |
| info.holding_primary_replicas = dir_node->holding_primary_replicas; |
| info.holding_secondary_replicas = dir_node->holding_secondary_replicas; |
| } else { |
| const auto &primary_iter = dir_node->holding_primary_replicas.find(app_id); |
| if (primary_iter != dir_node->holding_primary_replicas.end()) { |
| info.holding_primary_replicas[app_id] = primary_iter->second; |
| } |
| |
| const auto &secondary_iter = dir_node->holding_secondary_replicas.find(app_id); |
| if (secondary_iter != dir_node->holding_secondary_replicas.end()) { |
| info.holding_secondary_replicas[app_id] = secondary_iter->second; |
| } |
| } |
| info.tag = dir_node->tag; |
| info.full_dir = dir_node->full_dir; |
| info.disk_capacity_mb = dir_node->disk_capacity_mb; |
| info.disk_available_mb = dir_node->disk_available_mb; |
| |
| resp.disk_infos.emplace_back(info); |
| } |
| |
| resp.total_capacity_mb = _fs_manager._total_capacity_mb; |
| resp.total_available_mb = _fs_manager._total_available_mb; |
| |
| resp.err = ERR_OK; |
| } |
| |
| void replica_stub::on_disk_migrate(replica_disk_migrate_rpc rpc) |
| { |
| const replica_disk_migrate_request &request = rpc.request(); |
| replica_disk_migrate_response &response = rpc.response(); |
| |
| replica_ptr rep = get_replica(request.pid); |
| if (rep != nullptr) { |
| rep->disk_migrator()->on_migrate_replica(rpc); // THREAD_POOL_DEFAULT |
| } else { |
| response.err = ERR_OBJECT_NOT_FOUND; |
| } |
| } |
| |
| void replica_stub::on_query_app_info(query_app_info_rpc rpc) |
| { |
| const query_app_info_request &req = rpc.request(); |
| query_app_info_response &resp = rpc.response(); |
| |
| LOG_INFO("got query app info request from ({})", req.meta_server); |
| resp.err = dsn::ERR_OK; |
| std::set<app_id> visited_apps; |
| { |
| zauto_read_lock l(_replicas_lock); |
| for (auto it = _replicas.begin(); it != _replicas.end(); ++it) { |
| replica_ptr &r = it->second; |
| const app_info &info = *r->get_app_info(); |
| if (visited_apps.find(info.app_id) == visited_apps.end()) { |
| resp.apps.push_back(info); |
| visited_apps.insert(info.app_id); |
| } |
| } |
| for (auto it = _closing_replicas.begin(); it != _closing_replicas.end(); ++it) { |
| const app_info &info = std::get<2>(it->second); |
| if (visited_apps.find(info.app_id) == visited_apps.end()) { |
| resp.apps.push_back(info); |
| visited_apps.insert(info.app_id); |
| } |
| } |
| for (auto it = _closed_replicas.begin(); it != _closed_replicas.end(); ++it) { |
| const app_info &info = it->second.first; |
| if (visited_apps.find(info.app_id) == visited_apps.end()) { |
| resp.apps.push_back(info); |
| visited_apps.insert(info.app_id); |
| } |
| } |
| } |
| } |
| |
| // ThreadPool: THREAD_POOL_DEFAULT |
| void replica_stub::on_add_new_disk(add_new_disk_rpc rpc) |
| { |
| const auto &disk_str = rpc.request().disk_str; |
| auto &resp = rpc.response(); |
| resp.err = ERR_OK; |
| |
| std::vector<std::string> data_dirs; |
| std::vector<std::string> data_dir_tags; |
| std::string err_msg; |
| if (disk_str.empty() || |
| !replication_options::get_data_dir_and_tag( |
| disk_str, "", "replica", data_dirs, data_dir_tags, err_msg)) { |
| resp.err = ERR_INVALID_PARAMETERS; |
| resp.__set_err_hint(fmt::format("invalid str({}), err_msg: {}", disk_str, err_msg)); |
| return; |
| } |
| |
| for (auto i = 0; i < data_dir_tags.size(); ++i) { |
| auto dir = data_dirs[i]; |
| if (_fs_manager.is_dir_node_available(dir, data_dir_tags[i])) { |
| resp.err = ERR_NODE_ALREADY_EXIST; |
| resp.__set_err_hint( |
| fmt::format("data_dir({}) tag({}) already available", dir, data_dir_tags[i])); |
| return; |
| } |
| |
| if (dsn_unlikely(utils::filesystem::directory_exists(dir) && |
| !utils::filesystem::is_directory_empty(dir).second)) { |
| resp.err = ERR_DIR_NOT_EMPTY; |
| resp.__set_err_hint(fmt::format("Disk({}) directory is not empty", dir)); |
| return; |
| } |
| |
| std::string cdir; |
| if (dsn_unlikely(!utils::filesystem::create_directory(dir, cdir, err_msg) || |
| !utils::filesystem::check_dir_rw(dir, err_msg))) { |
| resp.err = ERR_FILE_OPERATION_FAILED; |
| resp.__set_err_hint(err_msg); |
| return; |
| } |
| |
| LOG_INFO("Add a new disk in fs_manager, data_dir={}, tag={}", cdir, data_dir_tags[i]); |
| // TODO(yingchun): there is a gap between _fs_manager.is_dir_node_exist() and |
| // _fs_manager.add_new_dir_node() which is not atomic. |
| _fs_manager.add_new_dir_node(cdir, data_dir_tags[i]); |
| } |
| } |
| |
| void replica_stub::on_nfs_copy(const ::dsn::service::copy_request &request, |
| ::dsn::rpc_replier<::dsn::service::copy_response> &reply) |
| { |
| if (check_status_and_authz_with_reply(request, reply, ranger::access_type::kWrite)) { |
| _nfs->on_copy(request, reply); |
| } |
| } |
| |
| void replica_stub::on_nfs_get_file_size( |
| const ::dsn::service::get_file_size_request &request, |
| ::dsn::rpc_replier<::dsn::service::get_file_size_response> &reply) |
| { |
| if (check_status_and_authz_with_reply(request, reply, ranger::access_type::kWrite)) { |
| _nfs->on_get_file_size(request, reply); |
| } |
| } |
| |
| void replica_stub::on_prepare(dsn::message_ex *request) |
| { |
| gpid id; |
| dsn::unmarshall(request, id); |
| replica_ptr rep = get_replica(id); |
| if (rep != nullptr) { |
| rep->on_prepare(request); |
| } else { |
| prepare_ack resp; |
| resp.pid = id; |
| resp.err = ERR_OBJECT_NOT_FOUND; |
| reply(request, resp); |
| } |
| } |
| |
| void replica_stub::on_group_check(group_check_rpc rpc) |
| { |
| const group_check_request &request = rpc.request(); |
| group_check_response &response = rpc.response(); |
| if (!is_connected()) { |
| LOG_WARNING("{}@{}: received group check: not connected, ignore", |
| request.config.pid, |
| _primary_address_str); |
| return; |
| } |
| |
| LOG_INFO("{}@{}: received group check, primary = {}, ballot = {}, status = {}, " |
| "last_committed_decree = {}", |
| request.config.pid, |
| _primary_address_str, |
| request.config.primary, |
| request.config.ballot, |
| enum_to_string(request.config.status), |
| request.last_committed_decree); |
| |
| replica_ptr rep = get_replica(request.config.pid); |
| if (rep != nullptr) { |
| rep->on_group_check(request, response); |
| } else { |
| if (request.config.status == partition_status::PS_POTENTIAL_SECONDARY) { |
| std::shared_ptr<group_check_request> req(new group_check_request); |
| *req = request; |
| |
| begin_open_replica(request.app, request.config.pid, req, nullptr); |
| response.err = ERR_OK; |
| response.learner_signature = invalid_signature; |
| } else { |
| response.err = ERR_OBJECT_NOT_FOUND; |
| } |
| } |
| } |
| |
| void replica_stub::on_learn(dsn::message_ex *msg) |
| { |
| learn_response response; |
| learn_request request; |
| ::dsn::unmarshall(msg, request); |
| |
| replica_ptr rep = get_replica(request.pid); |
| if (rep != nullptr) { |
| if (!rep->access_controller_allowed(msg, ranger::access_type::kWrite)) { |
| response.err = ERR_ACL_DENY; |
| reply(msg, response); |
| return; |
| } |
| rep->on_learn(msg, request); |
| } else { |
| response.err = ERR_OBJECT_NOT_FOUND; |
| reply(msg, response); |
| } |
| } |
| |
| void replica_stub::on_learn_completion_notification(learn_completion_notification_rpc rpc) |
| { |
| const group_check_response &report = rpc.request(); |
| learn_notify_response &response = rpc.response(); |
| response.pid = report.pid; |
| response.signature = report.learner_signature; |
| replica_ptr rep = get_replica(report.pid); |
| if (rep != nullptr) { |
| rep->on_learn_completion_notification(report, response); |
| } else { |
| response.err = ERR_OBJECT_NOT_FOUND; |
| } |
| } |
| |
| void replica_stub::on_add_learner(const group_check_request &request) |
| { |
| if (!is_connected()) { |
| LOG_WARNING("{}@{}: received add learner, primary = {}, not connected, ignore", |
| request.config.pid, |
| _primary_address_str, |
| request.config.primary); |
| return; |
| } |
| |
| LOG_INFO("{}@{}: received add learner, primary = {}, ballot = {}, status = {}, " |
| "last_committed_decree = {}", |
| request.config.pid, |
| _primary_address_str, |
| request.config.primary, |
| request.config.ballot, |
| enum_to_string(request.config.status), |
| request.last_committed_decree); |
| |
| replica_ptr rep = get_replica(request.config.pid); |
| if (rep != nullptr) { |
| rep->on_add_learner(request); |
| } else { |
| std::shared_ptr<group_check_request> req(new group_check_request); |
| *req = request; |
| begin_open_replica(request.app, request.config.pid, req, nullptr); |
| } |
| } |
| |
| void replica_stub::on_remove(const replica_configuration &request) |
| { |
| replica_ptr rep = get_replica(request.pid); |
| if (rep != nullptr) { |
| rep->on_remove(request); |
| } |
| } |
| |
| void replica_stub::get_replica_info(replica_info &info, replica_ptr r) |
| { |
| info.pid = r->get_gpid(); |
| info.ballot = r->get_ballot(); |
| info.status = r->status(); |
| info.app_type = r->get_app_info()->app_type; |
| info.last_committed_decree = r->last_committed_decree(); |
| info.last_prepared_decree = r->last_prepared_decree(); |
| info.last_durable_decree = r->last_durable_decree(); |
| |
| dsn::error_code err = _fs_manager.get_disk_tag(r->dir(), info.disk_tag); |
| if (dsn::ERR_OK != err) { |
| LOG_WARNING("get disk tag of {} failed: {}", r->dir(), err); |
| } |
| |
| info.__set_manual_compact_status(r->get_manual_compact_status()); |
| } |
| |
| void replica_stub::get_local_replicas(std::vector<replica_info> &replicas) |
| { |
| zauto_read_lock l(_replicas_lock); |
| // local_replicas = replicas + closing_replicas + closed_replicas |
| int total_replicas = _replicas.size() + _closing_replicas.size() + _closed_replicas.size(); |
| replicas.reserve(total_replicas); |
| |
| for (auto &pairs : _replicas) { |
| replica_ptr &rep = pairs.second; |
| // child partition should not sync config from meta server |
| // because it is not ready in meta view |
| if (rep->status() == partition_status::PS_PARTITION_SPLIT) { |
| continue; |
| } |
| replica_info info; |
| get_replica_info(info, rep); |
| replicas.push_back(std::move(info)); |
| } |
| |
| for (auto &pairs : _closing_replicas) { |
| replicas.push_back(std::get<3>(pairs.second)); |
| } |
| |
| for (auto &pairs : _closed_replicas) { |
| replicas.push_back(pairs.second.second); |
| } |
| } |
| |
| // run in THREAD_POOL_META_SERVER |
| // assert(_state_lock.locked()) |
| void replica_stub::query_configuration_by_node() |
| { |
| if (_state == NS_Disconnected) { |
| return; |
| } |
| |
| if (_config_query_task != nullptr) { |
| return; |
| } |
| |
| dsn::message_ex *msg = dsn::message_ex::create_request(RPC_CM_CONFIG_SYNC); |
| |
| configuration_query_by_node_request req; |
| req.node = _primary_address; |
| |
| // TODO: send stored replicas may cost network, we shouldn't config the frequency |
| get_local_replicas(req.stored_replicas); |
| req.__isset.stored_replicas = true; |
| |
| ::dsn::marshall(msg, req); |
| |
| LOG_INFO("send query node partitions request to meta server, stored_replicas_count = {}", |
| req.stored_replicas.size()); |
| |
| rpc_address target(_failure_detector->get_servers()); |
| _config_query_task = |
| rpc::call(target, |
| msg, |
| &_tracker, |
| [this](error_code err, dsn::message_ex *request, dsn::message_ex *resp) { |
| on_node_query_reply(err, request, resp); |
| }); |
| } |
| |
| void replica_stub::on_meta_server_connected() |
| { |
| LOG_INFO("meta server connected"); |
| |
| zauto_lock l(_state_lock); |
| if (_state == NS_Disconnected) { |
| _state = NS_Connecting; |
| tasking::enqueue(LPC_QUERY_CONFIGURATION_ALL, &_tracker, [this]() { |
| zauto_lock l(_state_lock); |
| this->query_configuration_by_node(); |
| }); |
| } |
| } |
| |
| // run in THREAD_POOL_META_SERVER |
| void replica_stub::on_node_query_reply(error_code err, |
| dsn::message_ex *request, |
| dsn::message_ex *response) |
| { |
| LOG_INFO("query node partitions replied, err = {}", err); |
| |
| zauto_lock l(_state_lock); |
| _config_query_task = nullptr; |
| if (err != ERR_OK) { |
| if (_state == NS_Connecting) { |
| query_configuration_by_node(); |
| } |
| } else { |
| if (_state == NS_Connecting) { |
| _state = NS_Connected; |
| } |
| |
| // DO NOT UPDATE STATE WHEN DISCONNECTED |
| if (_state != NS_Connected) |
| return; |
| |
| configuration_query_by_node_response resp; |
| ::dsn::unmarshall(response, resp); |
| |
| if (resp.err == ERR_BUSY) { |
| int delay_ms = 500; |
| LOG_INFO("resend query node partitions request after {} ms for resp.err = ERR_BUSY", |
| delay_ms); |
| _config_query_task = tasking::enqueue(LPC_QUERY_CONFIGURATION_ALL, |
| &_tracker, |
| [this]() { |
| zauto_lock l(_state_lock); |
| _config_query_task = nullptr; |
| this->query_configuration_by_node(); |
| }, |
| 0, |
| std::chrono::milliseconds(delay_ms)); |
| return; |
| } |
| if (resp.err != ERR_OK) { |
| LOG_INFO("ignore query node partitions response for resp.err = {}", resp.err); |
| return; |
| } |
| |
| LOG_INFO("process query node partitions response for resp.err = ERR_OK, " |
| "partitions_count({}), gc_replicas_count({})", |
| resp.partitions.size(), |
| resp.gc_replicas.size()); |
| |
| replicas rs; |
| { |
| zauto_read_lock l(_replicas_lock); |
| rs = _replicas; |
| } |
| |
| for (auto it = resp.partitions.begin(); it != resp.partitions.end(); ++it) { |
| rs.erase(it->config.pid); |
| tasking::enqueue(LPC_QUERY_NODE_CONFIGURATION_SCATTER, |
| &_tracker, |
| std::bind(&replica_stub::on_node_query_reply_scatter, this, this, *it), |
| it->config.pid.thread_hash()); |
| } |
| |
| // for rps not exist on meta_servers |
| for (auto it = rs.begin(); it != rs.end(); ++it) { |
| tasking::enqueue( |
| LPC_QUERY_NODE_CONFIGURATION_SCATTER2, |
| &_tracker, |
| std::bind(&replica_stub::on_node_query_reply_scatter2, this, this, it->first), |
| it->first.thread_hash()); |
| } |
| |
| // handle the replicas which need to be gc |
| if (resp.__isset.gc_replicas) { |
| for (replica_info &rep : resp.gc_replicas) { |
| replica_stub::replica_life_cycle lc = get_replica_life_cycle(rep.pid); |
| if (lc == replica_stub::RL_closed) { |
| tasking::enqueue(LPC_GARBAGE_COLLECT_LOGS_AND_REPLICAS, |
| &_tracker, |
| std::bind(&replica_stub::on_gc_replica, this, this, rep.pid), |
| 0); |
| } |
| } |
| } |
| } |
| } |
| |
| void replica_stub::set_meta_server_connected_for_test( |
| const configuration_query_by_node_response &resp) |
| { |
| zauto_lock l(_state_lock); |
| CHECK_NE(_state, NS_Connected); |
| _state = NS_Connected; |
| |
| for (auto it = resp.partitions.begin(); it != resp.partitions.end(); ++it) { |
| tasking::enqueue(LPC_QUERY_NODE_CONFIGURATION_SCATTER, |
| &_tracker, |
| std::bind(&replica_stub::on_node_query_reply_scatter, this, this, *it), |
| it->config.pid.thread_hash()); |
| } |
| } |
| |
| void replica_stub::set_replica_state_subscriber_for_test(replica_state_subscriber subscriber, |
| bool is_long_subscriber) |
| { |
| _replica_state_subscriber = subscriber; |
| _is_long_subscriber = is_long_subscriber; |
| } |
| |
| // this_ is used to hold a ref to replica_stub so we don't need to cancel the task on |
| // replica_stub::close |
| // ThreadPool: THREAD_POOL_REPLICATION |
| void replica_stub::on_node_query_reply_scatter(replica_stub_ptr this_, |
| const configuration_update_request &req) |
| { |
| replica_ptr replica = get_replica(req.config.pid); |
| if (replica != nullptr) { |
| replica->on_config_sync(req.info, |
| req.config, |
| req.__isset.meta_split_status ? req.meta_split_status |
| : split_status::NOT_SPLIT); |
| } else { |
| if (req.config.primary == _primary_address) { |
| LOG_INFO("{}@{}: replica not exists on replica server, which is primary, remove it " |
| "from meta server", |
| req.config.pid, |
| _primary_address_str); |
| remove_replica_on_meta_server(req.info, req.config); |
| } else { |
| LOG_INFO( |
| "{}@{}: replica not exists on replica server, which is not primary, just ignore", |
| req.config.pid, |
| _primary_address_str); |
| } |
| } |
| } |
| |
| // ThreadPool: THREAD_POOL_REPLICATION |
| void replica_stub::on_node_query_reply_scatter2(replica_stub_ptr this_, gpid id) |
| { |
| replica_ptr replica = get_replica(id); |
| if (replica != nullptr && replica->status() != partition_status::PS_POTENTIAL_SECONDARY && |
| replica->status() != partition_status::PS_PARTITION_SPLIT) { |
| if (replica->status() == partition_status::PS_INACTIVE && |
| dsn_now_ms() - replica->create_time_milliseconds() < |
| FLAGS_gc_memory_replica_interval_ms) { |
| LOG_INFO("{}: replica not exists on meta server, wait to close", replica->name()); |
| return; |
| } |
| |
| LOG_INFO("{}: replica not exists on meta server, remove", replica->name()); |
| |
| // TODO: set PS_INACTIVE instead for further state reuse |
| replica->update_local_configuration_with_no_ballot_change(partition_status::PS_ERROR); |
| } |
| } |
| |
| void replica_stub::remove_replica_on_meta_server(const app_info &info, |
| const partition_configuration &config) |
| { |
| if (FLAGS_fd_disabled) { |
| return; |
| } |
| |
| dsn::message_ex *msg = dsn::message_ex::create_request(RPC_CM_UPDATE_PARTITION_CONFIGURATION); |
| |
| std::shared_ptr<configuration_update_request> request(new configuration_update_request); |
| request->info = info; |
| request->config = config; |
| request->config.ballot++; |
| request->node = _primary_address; |
| request->type = config_type::CT_DOWNGRADE_TO_INACTIVE; |
| |
| if (_primary_address == config.primary) { |
| request->config.primary.set_invalid(); |
| } else if (replica_helper::remove_node(_primary_address, request->config.secondaries)) { |
| } else { |
| return; |
| } |
| |
| ::dsn::marshall(msg, *request); |
| |
| rpc_address target(_failure_detector->get_servers()); |
| rpc::call(_failure_detector->get_servers(), |
| msg, |
| nullptr, |
| [](error_code err, dsn::message_ex *, dsn::message_ex *) {}); |
| } |
| |
| void replica_stub::on_meta_server_disconnected() |
| { |
| LOG_INFO("meta server disconnected"); |
| |
| zauto_lock l(_state_lock); |
| if (NS_Disconnected == _state) |
| return; |
| |
| _state = NS_Disconnected; |
| |
| replicas rs; |
| { |
| zauto_read_lock l(_replicas_lock); |
| rs = _replicas; |
| } |
| |
| for (auto it = rs.begin(); it != rs.end(); ++it) { |
| tasking::enqueue( |
| LPC_CM_DISCONNECTED_SCATTER, |
| &_tracker, |
| std::bind(&replica_stub::on_meta_server_disconnected_scatter, this, this, it->first), |
| it->first.thread_hash()); |
| } |
| } |
| |
| // this_ is used to hold a ref to replica_stub so we don't need to cancel the task on |
| // replica_stub::close |
| void replica_stub::on_meta_server_disconnected_scatter(replica_stub_ptr this_, gpid id) |
| { |
| { |
| zauto_lock l(_state_lock); |
| if (_state != NS_Disconnected) |
| return; |
| } |
| |
| replica_ptr replica = get_replica(id); |
| if (replica != nullptr) { |
| replica->on_meta_server_disconnected(); |
| } |
| } |
| |
| void replica_stub::response_client(gpid id, |
| bool is_read, |
| dsn::message_ex *request, |
| partition_status::type status, |
| error_code error) |
| { |
| if (error == ERR_BUSY) { |
| if (is_read) |
| _counter_recent_read_busy_count->increment(); |
| else |
| _counter_recent_write_busy_count->increment(); |
| } else if (error != ERR_OK) { |
| if (is_read) |
| _counter_recent_read_fail_count->increment(); |
| else |
| _counter_recent_write_fail_count->increment(); |
| LOG_ERROR("{}@{}: {} fail: client = {}, code = {}, timeout = {}, status = {}, error = {}", |
| id, |
| _primary_address_str, |
| is_read ? "read" : "write", |
| request == nullptr ? "null" : request->header->from_address.to_string(), |
| request == nullptr ? "null" : request->header->rpc_name, |
| request == nullptr ? 0 : request->header->client.timeout_ms, |
| enum_to_string(status), |
| error); |
| } |
| |
| if (request != nullptr) { |
| dsn_rpc_reply(request->create_response(), error); |
| } |
| } |
| |
| void replica_stub::init_gc_for_test() |
| { |
| CHECK(FLAGS_gc_disabled, ""); |
| |
| _gc_timer_task = tasking::enqueue(LPC_GARBAGE_COLLECT_LOGS_AND_REPLICAS, |
| &_tracker, |
| [this] { on_gc(); }, |
| 0, |
| std::chrono::milliseconds(FLAGS_gc_interval_ms)); |
| } |
| |
| void replica_stub::on_gc_replica(replica_stub_ptr this_, gpid id) |
| { |
| std::pair<app_info, replica_info> closed_info; |
| { |
| zauto_write_lock l(_replicas_lock); |
| auto iter = _closed_replicas.find(id); |
| if (iter == _closed_replicas.end()) |
| return; |
| closed_info = iter->second; |
| _closed_replicas.erase(iter); |
| } |
| _fs_manager.remove_replica(id); |
| |
| const auto *const dn = _fs_manager.find_replica_dir(closed_info.first.app_type, id); |
| if (dn == nullptr) { |
| LOG_WARNING( |
| "gc closed replica({}.{}) failed, no exist data", id, closed_info.first.app_type); |
| return; |
| } |
| |
| const auto replica_path = dn->replica_dir(closed_info.first.app_type, id); |
| CHECK( |
| dsn::utils::filesystem::directory_exists(replica_path), "dir({}) not exist", replica_path); |
| LOG_INFO("start to move replica({}) as garbage, path: {}", id, replica_path); |
| const auto rename_path = fmt::format("{}.{}.gar", replica_path, dsn_now_us()); |
| if (!dsn::utils::filesystem::rename_path(replica_path, rename_path)) { |
| LOG_WARNING("gc_replica: failed to move directory '{}' to '{}'", replica_path, rename_path); |
| |
| // if gc the replica failed, add it back |
| { |
| zauto_write_lock l(_replicas_lock); |
| _closed_replicas.emplace(id, closed_info); |
| } |
| _fs_manager.add_replica(id, replica_path); |
| } else { |
| LOG_WARNING("gc_replica: replica_dir_op succeed to move directory '{}' to '{}'", |
| replica_path, |
| rename_path); |
| _counter_replicas_recent_replica_move_garbage_count->increment(); |
| } |
| } |
| |
| void replica_stub::on_gc() |
| { |
| uint64_t start = dsn_now_ns(); |
| |
| struct gc_info |
| { |
| replica_ptr rep; |
| partition_status::type status; |
| mutation_log_ptr plog; |
| decree last_durable_decree; |
| int64_t init_offset_in_shared_log; |
| }; |
| |
| std::unordered_map<gpid, gc_info> rs; |
| { |
| zauto_read_lock l(_replicas_lock); |
| // collect info in lock to prevent the case that the replica is closed in replica::close() |
| for (auto &kv : _replicas) { |
| const replica_ptr &rep = kv.second; |
| gc_info &info = rs[kv.first]; |
| info.rep = rep; |
| info.status = rep->status(); |
| info.plog = rep->private_log(); |
| info.last_durable_decree = rep->last_durable_decree(); |
| info.init_offset_in_shared_log = rep->get_app()->init_info().init_offset_in_shared_log; |
| } |
| } |
| |
| LOG_INFO("start to garbage collection, replica_count = {}", rs.size()); |
| |
| // gc shared prepare log |
| // |
| // Now that checkpoint is very important for gc, we must be able to trigger checkpoint when |
| // necessary. |
| // that is, we should be able to trigger memtable flush when necessary. |
| // |
| // How to trigger memtable flush? |
| // we add a parameter `is_emergency' in dsn_app_async_checkpoint() function, when set true, |
| // the undering storage system should flush memtable as soon as possiable. |
| // |
| // When to trigger memtable flush? |
| // 1. Using `[replication].checkpoint_max_interval_hours' option, we can set max interval time |
| // of two adjacent checkpoints; If the time interval is arrived, then emergency checkpoint |
| // will be triggered. |
| // 2. Using `[replication].log_shared_file_count_limit' option, we can set max file count of |
| // shared log; If the limit is exceeded, then emergency checkpoint will be triggered; Instead |
| // of triggering all replicas to do checkpoint, we will only trigger a few of necessary |
| // replicas which block garbage collection of the oldest log file. |
| // |
| if (_log != nullptr) { |
| replica_log_info_map gc_condition; |
| for (auto &kv : rs) { |
| replica_log_info ri; |
| replica_ptr &rep = kv.second.rep; |
| mutation_log_ptr &plog = kv.second.plog; |
| if (plog) { |
| // flush private log to update plog_max_commit_on_disk, |
| // and just flush once to avoid flushing infinitely |
| plog->flush_once(); |
| |
| decree plog_max_commit_on_disk = plog->max_commit_on_disk(); |
| ri.max_decree = std::min(kv.second.last_durable_decree, plog_max_commit_on_disk); |
| LOG_INFO("gc_shared: gc condition for {}, status = {}, garbage_max_decree = {}, " |
| "last_durable_decree= {}, plog_max_commit_on_disk = {}", |
| rep->name(), |
| enum_to_string(kv.second.status), |
| ri.max_decree, |
| kv.second.last_durable_decree, |
| plog_max_commit_on_disk); |
| } else { |
| ri.max_decree = kv.second.last_durable_decree; |
| LOG_INFO("gc_shared: gc condition for {}, status = {}, garbage_max_decree = {}, " |
| "last_durable_decree = {}", |
| rep->name(), |
| enum_to_string(kv.second.status), |
| ri.max_decree, |
| kv.second.last_durable_decree); |
| } |
| ri.valid_start_offset = kv.second.init_offset_in_shared_log; |
| gc_condition[kv.first] = ri; |
| } |
| |
| std::set<gpid> prevent_gc_replicas; |
| int reserved_log_count = _log->garbage_collection( |
| gc_condition, FLAGS_log_shared_file_count_limit, prevent_gc_replicas); |
| if (reserved_log_count > FLAGS_log_shared_file_count_limit * 2) { |
| LOG_INFO( |
| "gc_shared: trigger emergency checkpoint by FLAGS_log_shared_file_count_limit, " |
| "file_count_limit = {}, reserved_log_count = {}, trigger all replicas to do " |
| "checkpoint", |
| FLAGS_log_shared_file_count_limit, |
| reserved_log_count); |
| for (auto &kv : rs) { |
| tasking::enqueue( |
| LPC_PER_REPLICA_CHECKPOINT_TIMER, |
| kv.second.rep->tracker(), |
| std::bind(&replica_stub::trigger_checkpoint, this, kv.second.rep, true), |
| kv.first.thread_hash(), |
| std::chrono::milliseconds(rand::next_u32(0, FLAGS_gc_interval_ms / 2))); |
| } |
| } else if (reserved_log_count > FLAGS_log_shared_file_count_limit) { |
| std::ostringstream oss; |
| int c = 0; |
| for (auto &i : prevent_gc_replicas) { |
| if (c != 0) |
| oss << ", "; |
| oss << i.to_string(); |
| c++; |
| } |
| LOG_INFO( |
| "gc_shared: trigger emergency checkpoint by FLAGS_log_shared_file_count_limit, " |
| "file_count_limit = {}, reserved_log_count = {}, prevent_gc_replica_count = " |
| "{}, trigger them to do checkpoint: {}", |
| FLAGS_log_shared_file_count_limit, |
| reserved_log_count, |
| prevent_gc_replicas.size(), |
| oss.str()); |
| for (auto &id : prevent_gc_replicas) { |
| auto find = rs.find(id); |
| if (find != rs.end()) { |
| tasking::enqueue( |
| LPC_PER_REPLICA_CHECKPOINT_TIMER, |
| find->second.rep->tracker(), |
| std::bind(&replica_stub::trigger_checkpoint, this, find->second.rep, true), |
| id.thread_hash(), |
| std::chrono::milliseconds(rand::next_u32(0, FLAGS_gc_interval_ms / 2))); |
| } |
| } |
| } |
| |
| _counter_shared_log_size->set(_log->total_size() / (1024 * 1024)); |
| } |
| |
| // statistic learning info |
| uint64_t learning_count = 0; |
| uint64_t learning_max_duration_time_ms = 0; |
| uint64_t learning_max_copy_file_size = 0; |
| uint64_t cold_backup_running_count = 0; |
| uint64_t cold_backup_max_duration_time_ms = 0; |
| uint64_t cold_backup_max_upload_file_size = 0; |
| uint64_t bulk_load_running_count = 0; |
| uint64_t bulk_load_max_ingestion_time_ms = 0; |
| uint64_t bulk_load_max_duration_time_ms = 0; |
| uint64_t splitting_count = 0; |
| uint64_t splitting_max_duration_time_ms = 0; |
| uint64_t splitting_max_async_learn_time_ms = 0; |
| uint64_t splitting_max_copy_file_size = 0; |
| for (auto &kv : rs) { |
| replica_ptr &rep = kv.second.rep; |
| if (rep->status() == partition_status::PS_POTENTIAL_SECONDARY) { |
| learning_count++; |
| learning_max_duration_time_ms = std::max( |
| learning_max_duration_time_ms, rep->_potential_secondary_states.duration_ms()); |
| learning_max_copy_file_size = |
| std::max(learning_max_copy_file_size, |
| rep->_potential_secondary_states.learning_copy_file_size); |
| } |
| if (rep->status() == partition_status::PS_PRIMARY || |
| rep->status() == partition_status::PS_SECONDARY) { |
| cold_backup_running_count += rep->_cold_backup_running_count.load(); |
| cold_backup_max_duration_time_ms = std::max( |
| cold_backup_max_duration_time_ms, rep->_cold_backup_max_duration_time_ms.load()); |
| cold_backup_max_upload_file_size = std::max( |
| cold_backup_max_upload_file_size, rep->_cold_backup_max_upload_file_size.load()); |
| |
| if (rep->get_bulk_loader()->get_bulk_load_status() != bulk_load_status::BLS_INVALID) { |
| bulk_load_running_count++; |
| bulk_load_max_ingestion_time_ms = |
| std::max(bulk_load_max_ingestion_time_ms, rep->ingestion_duration_ms()); |
| bulk_load_max_duration_time_ms = |
| std::max(bulk_load_max_duration_time_ms, rep->get_bulk_loader()->duration_ms()); |
| } |
| } |
| // splitting_max_copy_file_size, rep->_split_states.copy_file_size |
| if (rep->status() == partition_status::PS_PARTITION_SPLIT) { |
| splitting_count++; |
| splitting_max_duration_time_ms = |
| std::max(splitting_max_duration_time_ms, rep->_split_states.total_ms()); |
| splitting_max_async_learn_time_ms = |
| std::max(splitting_max_async_learn_time_ms, rep->_split_states.async_learn_ms()); |
| splitting_max_copy_file_size = |
| std::max(splitting_max_copy_file_size, rep->_split_states.splitting_copy_file_size); |
| } |
| } |
| |
| _counter_replicas_learning_count->set(learning_count); |
| _counter_replicas_learning_max_duration_time_ms->set(learning_max_duration_time_ms); |
| _counter_replicas_learning_max_copy_file_size->set(learning_max_copy_file_size); |
| _counter_cold_backup_running_count->set(cold_backup_running_count); |
| _counter_cold_backup_max_duration_time_ms->set(cold_backup_max_duration_time_ms); |
| _counter_cold_backup_max_upload_file_size->set(cold_backup_max_upload_file_size); |
| _counter_bulk_load_running_count->set(bulk_load_running_count); |
| _counter_bulk_load_max_ingestion_time_ms->set(bulk_load_max_ingestion_time_ms); |
| _counter_bulk_load_max_duration_time_ms->set(bulk_load_max_duration_time_ms); |
| _counter_replicas_splitting_count->set(splitting_count); |
| _counter_replicas_splitting_max_duration_time_ms->set(splitting_max_duration_time_ms); |
| _counter_replicas_splitting_max_async_learn_time_ms->set(splitting_max_async_learn_time_ms); |
| _counter_replicas_splitting_max_copy_file_size->set(splitting_max_copy_file_size); |
| |
| LOG_INFO("finish to garbage collection, time_used_ns = {}", dsn_now_ns() - start); |
| } |
| |
| void replica_stub::on_disk_stat() |
| { |
| LOG_INFO("start to update disk stat"); |
| uint64_t start = dsn_now_ns(); |
| disk_cleaning_report report{}; |
| |
| dsn::replication::disk_remove_useless_dirs(_fs_manager.get_dir_nodes(), report); |
| _fs_manager.update_disk_stat(); |
| update_disk_holding_replicas(); |
| |
| _counter_replicas_error_replica_dir_count->set(report.error_replica_count); |
| _counter_replicas_garbage_replica_dir_count->set(report.garbage_replica_count); |
| _counter_replicas_tmp_replica_dir_count->set(report.disk_migrate_tmp_count); |
| _counter_replicas_origin_replica_dir_count->set(report.disk_migrate_origin_count); |
| _counter_replicas_recent_replica_remove_dir_count->add(report.remove_dir_count); |
| |
| LOG_INFO("finish to update disk stat, time_used_ns = {}", dsn_now_ns() - start); |
| } |
| |
| task_ptr replica_stub::begin_open_replica( |
| const app_info &app, |
| gpid id, |
| const std::shared_ptr<group_check_request> &group_check, |
| const std::shared_ptr<configuration_update_request> &configuration_update) |
| { |
| _replicas_lock.lock_write(); |
| |
| if (_replicas.find(id) != _replicas.end()) { |
| _replicas_lock.unlock_write(); |
| LOG_INFO("open replica '{}.{}' failed coz replica is already opened", app.app_type, id); |
| return nullptr; |
| } |
| |
| if (_opening_replicas.find(id) != _opening_replicas.end()) { |
| _replicas_lock.unlock_write(); |
| LOG_INFO("open replica '{}.{}' failed coz replica is under opening", app.app_type, id); |
| return nullptr; |
| } |
| |
| auto it = _closing_replicas.find(id); |
| if (it != _closing_replicas.end()) { |
| task_ptr tsk = std::get<0>(it->second); |
| replica_ptr rep = std::get<1>(it->second); |
| if (rep->status() == partition_status::PS_INACTIVE && tsk->cancel(false)) { |
| // reopen it |
| _closing_replicas.erase(it); |
| _counter_replicas_closing_count->decrement(); |
| |
| _replicas.emplace(id, rep); |
| _counter_replicas_count->increment(); |
| |
| _closed_replicas.erase(id); |
| |
| // unlock here to avoid dead lock |
| _replicas_lock.unlock_write(); |
| |
| LOG_INFO("open replica '{}.{}' which is to be closed, reopen it", app.app_type, id); |
| |
| // open by add learner |
| if (group_check != nullptr) { |
| on_add_learner(*group_check); |
| } |
| } else { |
| _replicas_lock.unlock_write(); |
| LOG_INFO("open replica '{}.{}' failed coz replica is under closing", app.app_type, id); |
| } |
| return nullptr; |
| } |
| |
| task_ptr task = tasking::enqueue( |
| LPC_OPEN_REPLICA, |
| &_tracker, |
| std::bind(&replica_stub::open_replica, this, app, id, group_check, configuration_update)); |
| |
| _opening_replicas[id] = task; |
| _counter_replicas_opening_count->increment(); |
| _closed_replicas.erase(id); |
| |
| _replicas_lock.unlock_write(); |
| return task; |
| } |
| |
| void replica_stub::open_replica( |
| const app_info &app, |
| gpid id, |
| const std::shared_ptr<group_check_request> &group_check, |
| const std::shared_ptr<configuration_update_request> &configuration_update) |
| { |
| replica_ptr rep; |
| std::string dir; |
| auto dn = _fs_manager.find_replica_dir(app.app_type, id); |
| if (dn != nullptr) { |
| dir = dn->replica_dir(app.app_type, id); |
| CHECK(dsn::utils::filesystem::directory_exists(dir), "dir({}) not exist", dir); |
| // NOTICE: if partition is DDD, and meta select one replica as primary, it will execute the |
| // load-process because of a.b.pegasus is exist, so it will never execute the restore |
| // process below |
| LOG_INFO("{}@{}: start to load replica {} group check, dir = {}", |
| id, |
| _primary_address_str, |
| group_check ? "with" : "without", |
| dir); |
| rep = load_replica(dir.c_str()); |
| |
| // if load data failed, re-open the `*.ori` folder which is the origin replica dir of disk |
| // migration |
| if (rep == nullptr) { |
| const auto origin_dir_type = |
| fmt::format("{}{}", app.app_type, replica_disk_migrator::kReplicaDirOriginSuffix); |
| const auto origin_dn = _fs_manager.find_replica_dir(origin_dir_type, id); |
| if (origin_dn != nullptr) { |
| const auto origin_tmp_dir = origin_dn->replica_dir(origin_dir_type, id); |
| CHECK(dsn::utils::filesystem::directory_exists(origin_tmp_dir), |
| "dir({}) not exist", |
| origin_tmp_dir); |
| LOG_INFO("mark the dir {} as garbage, start revert and load disk migration origin " |
| "replica data({})", |
| dir, |
| origin_tmp_dir); |
| dsn::utils::filesystem::rename_path(dir, |
| fmt::format("{}{}", dir, kFolderSuffixGar)); |
| |
| std::string origin_dir = origin_tmp_dir; |
| // revert the origin replica dir |
| boost::replace_first( |
| origin_dir, replica_disk_migrator::kReplicaDirOriginSuffix, ""); |
| dsn::utils::filesystem::rename_path(origin_tmp_dir, origin_dir); |
| rep = load_replica(origin_dir.c_str()); |
| |
| FAIL_POINT_INJECT_F("mock_replica_load", [&](string_view) -> void {}); |
| } |
| } |
| } |
| |
| if (rep == nullptr) { |
| // NOTICE: if dir a.b.pegasus does not exist, or .app-info does not exist, but the ballot > |
| // 0, or the last_committed_decree > 0, start replica will fail |
| if ((configuration_update != nullptr) && (configuration_update->info.is_stateful)) { |
| CHECK(configuration_update->config.ballot == 0 && |
| configuration_update->config.last_committed_decree == 0, |
| "{}@{}: cannot load replica({}.{}), ballot = {}, " |
| "last_committed_decree = {}, but it does not existed!", |
| id.to_string(), |
| _primary_address_str, |
| id.to_string(), |
| app.app_type.c_str(), |
| configuration_update->config.ballot, |
| configuration_update->config.last_committed_decree); |
| } |
| |
| // NOTICE: only new_replica_group's assign_primary will execute this; if server restart when |
| // download restore-data from cold backup media, the a.b.pegasus will move to |
| // a.b.pegasus.timestamp.err when replica-server load all the replicas, so restore-flow will |
| // do it again |
| |
| bool restore_if_necessary = |
| ((configuration_update != nullptr) && |
| (configuration_update->type == config_type::CT_ASSIGN_PRIMARY) && |
| (app.envs.find(backup_restore_constant::POLICY_NAME) != app.envs.end())); |
| |
| bool is_duplication_follower = |
| ((configuration_update != nullptr) && |
| (configuration_update->type == config_type::CT_ASSIGN_PRIMARY) && |
| (app.envs.find(duplication_constants::kDuplicationEnvMasterClusterKey) != |
| app.envs.end()) && |
| (app.envs.find(duplication_constants::kDuplicationEnvMasterMetasKey) != |
| app.envs.end())); |
| |
| // NOTICE: when we don't need execute restore-process, we should remove a.b.pegasus |
| // directory because it don't contain the valid data dir and also we need create a new |
| // replica(if contain valid data, it will execute load-process) |
| |
| if (!restore_if_necessary && ::dsn::utils::filesystem::directory_exists(dir)) { |
| CHECK(::dsn::utils::filesystem::remove_path(dir), |
| "remove useless directory({}) failed", |
| dir); |
| } |
| rep = new_replica(id, app, restore_if_necessary, is_duplication_follower); |
| } |
| |
| if (rep == nullptr) { |
| LOG_INFO( |
| "{}@{}: open replica failed, erase from opening replicas", id, _primary_address_str); |
| zauto_write_lock l(_replicas_lock); |
| CHECK_GT_MSG(_opening_replicas.erase(id), |
| 0, |
| "replica {} is not in _opening_replicas", |
| id.to_string()); |
| _counter_replicas_opening_count->decrement(); |
| return; |
| } |
| |
| { |
| zauto_write_lock l(_replicas_lock); |
| CHECK_GT_MSG(_opening_replicas.erase(id), |
| 0, |
| "replica {} is not in _opening_replicas", |
| id.to_string()); |
| _counter_replicas_opening_count->decrement(); |
| |
| CHECK(_replicas.find(id) == _replicas.end(), |
| "replica {} is already in _replicas", |
| id.to_string()); |
| _replicas.insert(replicas::value_type(rep->get_gpid(), rep)); |
| _counter_replicas_count->increment(); |
| |
| _closed_replicas.erase(id); |
| } |
| |
| if (nullptr != group_check) { |
| rpc::call_one_way_typed(_primary_address, |
| RPC_LEARN_ADD_LEARNER, |
| *group_check, |
| group_check->config.pid.thread_hash()); |
| } else if (nullptr != configuration_update) { |
| rpc::call_one_way_typed(_primary_address, |
| RPC_CONFIG_PROPOSAL, |
| *configuration_update, |
| configuration_update->config.pid.thread_hash()); |
| } |
| } |
| |
| replica *replica_stub::new_replica(gpid gpid, |
| const app_info &app, |
| bool restore_if_necessary, |
| bool is_duplication_follower, |
| const std::string &parent_dir) |
| { |
| dir_node *dn = nullptr; |
| if (parent_dir.empty()) { |
| dn = _fs_manager.create_replica_dir_if_necessary(app.app_type, gpid); |
| } else { |
| dn = _fs_manager.create_child_replica_dir(app.app_type, gpid, parent_dir); |
| } |
| if (dn == nullptr) { |
| LOG_ERROR("could not allocate a new directory for replica {}", gpid); |
| return nullptr; |
| } |
| const auto &dir = dn->replica_dir(app.app_type, gpid); |
| CHECK(dsn::utils::filesystem::directory_exists(dir), "dir({}) not exist", dir); |
| auto *rep = |
| new replica(this, gpid, app, dir.c_str(), restore_if_necessary, is_duplication_follower); |
| error_code err; |
| if (restore_if_necessary && (err = rep->restore_checkpoint()) != dsn::ERR_OK) { |
| LOG_ERROR("{}: try to restore replica failed, error({})", rep->name(), err); |
| clear_on_failure(rep, dir, gpid); |
| return nullptr; |
| } |
| |
| if (is_duplication_follower && |
| (err = rep->get_replica_follower()->duplicate_checkpoint()) != dsn::ERR_OK) { |
| LOG_ERROR("{}: try to duplicate replica checkpoint failed, error({}) and please check " |
| "previous detail error log", |
| rep->name(), |
| err); |
| clear_on_failure(rep, dir, gpid); |
| return nullptr; |
| } |
| |
| err = rep->initialize_on_new(); |
| if (err != ERR_OK) { |
| LOG_ERROR("{}: new replica failed, err = {}", rep->name(), err); |
| clear_on_failure(rep, dir, gpid); |
| return nullptr; |
| } |
| |
| LOG_DEBUG("{}: new replica succeed", rep->name()); |
| return rep; |
| } |
| |
| replica *replica_stub::load_replica(const char *dir) |
| { |
| FAIL_POINT_INJECT_F("mock_replica_load", [&](string_view) -> replica * { return nullptr; }); |
| |
| char splitters[] = {'\\', '/', 0}; |
| std::string name = utils::get_last_component(std::string(dir), splitters); |
| if (name.empty()) { |
| LOG_ERROR("invalid replica dir {}", dir); |
| return nullptr; |
| } |
| |
| char app_type[128]; |
| int32_t app_id, pidx; |
| if (3 != sscanf(name.c_str(), "%d.%d.%s", &app_id, &pidx, app_type)) { |
| LOG_ERROR("invalid replica dir {}", dir); |
| return nullptr; |
| } |
| |
| gpid pid(app_id, pidx); |
| if (!utils::filesystem::directory_exists(dir)) { |
| LOG_ERROR("replica dir {} not exist", dir); |
| return nullptr; |
| } |
| |
| dsn::app_info info; |
| replica_app_info info2(&info); |
| std::string path = utils::filesystem::path_combine(dir, replica::kAppInfo); |
| auto err = info2.load(path); |
| if (ERR_OK != err) { |
| LOG_ERROR("load app-info from {} failed, err = {}", path, err); |
| return nullptr; |
| } |
| |
| if (info.app_type != app_type) { |
| LOG_ERROR("unmatched app type {} for {}", info.app_type, path); |
| return nullptr; |
| } |
| |
| if (info.partition_count < pidx) { |
| LOG_ERROR("partition[{}], count={}, this replica may be partition split garbage partition, " |
| "ignore it", |
| pid, |
| info.partition_count); |
| return nullptr; |
| } |
| |
| auto *rep = new replica(this, pid, info, dir, false); |
| err = rep->initialize_on_load(); |
| if (err != ERR_OK) { |
| LOG_ERROR("{}: load replica failed, err = {}", rep->name(), err); |
| rep->close(); |
| delete rep; |
| rep = nullptr; |
| |
| // clear work on failure |
| if (dsn::utils::filesystem::directory_exists(dir)) { |
| move_to_err_path(dir, "load replica"); |
| _counter_replicas_recent_replica_move_error_count->increment(); |
| _fs_manager.remove_replica(pid); |
| } |
| |
| return nullptr; |
| } |
| |
| LOG_INFO("{}: load replica succeed", rep->name()); |
| return rep; |
| } |
| |
| void replica_stub::clear_on_failure(replica *rep, const std::string &path, const gpid &pid) |
| { |
| rep->close(); |
| delete rep; |
| rep = nullptr; |
| |
| // clear work on failure |
| utils::filesystem::remove_path(path); |
| _fs_manager.remove_replica(pid); |
| } |
| |
| task_ptr replica_stub::begin_close_replica(replica_ptr r) |
| { |
| CHECK(r->status() == partition_status::PS_ERROR || |
| r->status() == partition_status::PS_INACTIVE || |
| r->disk_migrator()->status() >= disk_migration_status::MOVED, |
| "invalid state(partition_status={}, migration_status={}) when calling " |
| "replica({}) close", |
| enum_to_string(r->status()), |
| enum_to_string(r->disk_migrator()->status()), |
| r->name()); |
| |
| gpid id = r->get_gpid(); |
| |
| zauto_write_lock l(_replicas_lock); |
| if (_replicas.erase(id) == 0) { |
| return nullptr; |
| } |
| |
| _counter_replicas_count->decrement(); |
| |
| int delay_ms = 0; |
| if (r->status() == partition_status::PS_INACTIVE) { |
| delay_ms = FLAGS_gc_memory_replica_interval_ms; |
| LOG_INFO("{}: delay {} milliseconds to close replica, status = PS_INACTIVE", |
| r->name(), |
| delay_ms); |
| } |
| |
| app_info a_info = *(r->get_app_info()); |
| replica_info r_info; |
| get_replica_info(r_info, r); |
| task_ptr task = tasking::enqueue(LPC_CLOSE_REPLICA, |
| &_tracker, |
| [=]() { close_replica(r); }, |
| 0, |
| std::chrono::milliseconds(delay_ms)); |
| _closing_replicas[id] = std::make_tuple(task, r, std::move(a_info), std::move(r_info)); |
| _counter_replicas_closing_count->increment(); |
| return task; |
| } |
| |
| void replica_stub::close_replica(replica_ptr r) |
| { |
| LOG_INFO("{}: start to close replica", r->name()); |
| |
| gpid id = r->get_gpid(); |
| std::string name = r->name(); |
| |
| r->close(); |
| |
| { |
| zauto_write_lock l(_replicas_lock); |
| auto find = _closing_replicas.find(id); |
| CHECK(find != _closing_replicas.end(), "replica {} is not in _closing_replicas", name); |
| _closed_replicas.emplace( |
| id, std::make_pair(std::get<2>(find->second), std::get<3>(find->second))); |
| _closing_replicas.erase(find); |
| _counter_replicas_closing_count->decrement(); |
| } |
| |
| if (r->is_data_corrupted()) { |
| _fs_manager.remove_replica(id); |
| move_to_err_path(r->dir(), "trash replica"); |
| _counter_replicas_recent_replica_move_error_count->increment(); |
| } |
| |
| LOG_INFO("{}: finish to close replica", name); |
| } |
| |
| void replica_stub::notify_replica_state_update(const replica_configuration &config, bool is_closing) |
| { |
| if (nullptr != _replica_state_subscriber) { |
| if (_is_long_subscriber) { |
| tasking::enqueue( |
| LPC_REPLICA_STATE_CHANGE_NOTIFICATION, |
| &_tracker, |
| std::bind(_replica_state_subscriber, _primary_address, config, is_closing)); |
| } else { |
| _replica_state_subscriber(_primary_address, config, is_closing); |
| } |
| } |
| } |
| |
| void replica_stub::trigger_checkpoint(replica_ptr r, bool is_emergency) |
| { |
| r->init_checkpoint(is_emergency); |
| } |
| |
| void replica_stub::handle_log_failure(error_code err) |
| { |
| LOG_ERROR("handle log failure: {}", err); |
| CHECK(s_not_exit_on_log_failure, ""); |
| } |
| |
| void replica_stub::open_service() |
| { |
| register_rpc_handler(RPC_CONFIG_PROPOSAL, "ProposeConfig", &replica_stub::on_config_proposal); |
| register_rpc_handler(RPC_PREPARE, "prepare", &replica_stub::on_prepare); |
| register_rpc_handler(RPC_LEARN, "Learn", &replica_stub::on_learn); |
| register_rpc_handler_with_rpc_holder(RPC_LEARN_COMPLETION_NOTIFY, |
| "LearnNotify", |
| &replica_stub::on_learn_completion_notification); |
| register_rpc_handler(RPC_LEARN_ADD_LEARNER, "LearnAdd", &replica_stub::on_add_learner); |
| register_rpc_handler(RPC_REMOVE_REPLICA, "remove", &replica_stub::on_remove); |
| register_rpc_handler_with_rpc_holder( |
| RPC_GROUP_CHECK, "GroupCheck", &replica_stub::on_group_check); |
| register_rpc_handler_with_rpc_holder( |
| RPC_QUERY_PN_DECREE, "query_decree", &replica_stub::on_query_decree); |
| register_rpc_handler_with_rpc_holder( |
| RPC_QUERY_REPLICA_INFO, "query_replica_info", &replica_stub::on_query_replica_info); |
| register_rpc_handler_with_rpc_holder(RPC_QUERY_LAST_CHECKPOINT_INFO, |
| "query_last_checkpoint_info", |
| &replica_stub::on_query_last_checkpoint); |
| register_rpc_handler_with_rpc_holder( |
| RPC_QUERY_DISK_INFO, "query_disk_info", &replica_stub::on_query_disk_info); |
| register_rpc_handler_with_rpc_holder( |
| RPC_REPLICA_DISK_MIGRATE, "disk_migrate_replica", &replica_stub::on_disk_migrate); |
| register_rpc_handler_with_rpc_holder( |
| RPC_QUERY_APP_INFO, "query_app_info", &replica_stub::on_query_app_info); |
| register_rpc_handler_with_rpc_holder(RPC_SPLIT_UPDATE_CHILD_PARTITION_COUNT, |
| "update_child_group_partition_count", |
| &replica_stub::on_update_child_group_partition_count); |
| register_rpc_handler_with_rpc_holder(RPC_SPLIT_NOTIFY_CATCH_UP, |
| "child_notify_catch_up", |
| &replica_stub::on_notify_primary_split_catch_up); |
| register_rpc_handler_with_rpc_holder(RPC_BULK_LOAD, "bulk_load", &replica_stub::on_bulk_load); |
| register_rpc_handler_with_rpc_holder( |
| RPC_GROUP_BULK_LOAD, "group_bulk_load", &replica_stub::on_group_bulk_load); |
| register_rpc_handler_with_rpc_holder( |
| RPC_DETECT_HOTKEY, "detect_hotkey", &replica_stub::on_detect_hotkey); |
| register_rpc_handler_with_rpc_holder( |
| RPC_ADD_NEW_DISK, "add_new_disk", &replica_stub::on_add_new_disk); |
| |
| // nfs |
| register_async_rpc_handler(dsn::service::RPC_NFS_COPY, "copy", &replica_stub::on_nfs_copy); |
| register_async_rpc_handler( |
| dsn::service::RPC_NFS_GET_FILE_SIZE, "get_file_size", &replica_stub::on_nfs_get_file_size); |
| |
| register_ctrl_command(); |
| } |
| |
| #if !defined(DSN_ENABLE_GPERF) && defined(DSN_USE_JEMALLOC) |
| void replica_stub::register_jemalloc_ctrl_command() |
| { |
| _cmds.emplace_back(::dsn::command_manager::instance().register_command( |
| {"replica.dump-jemalloc-stats"}, |
| fmt::format("replica.dump-jemalloc-stats <{}> [buffer size]", kAllJeStatsTypesStr), |
| "dump stats of jemalloc", |
| [](const std::vector<std::string> &args) { |
| if (args.empty()) { |
| return std::string("invalid arguments"); |
| } |
| |
| auto type = enum_from_string(args[0].c_str(), je_stats_type::INVALID); |
| if (type == je_stats_type::INVALID) { |
| return std::string("invalid stats type"); |
| } |
| |
| std::string stats("\n"); |
| |
| if (args.size() == 1) { |
| dsn::je_dump_stats(type, stats); |
| return stats; |
| } |
| |
| uint64_t buf_sz; |
| if (!dsn::buf2uint64(args[1], buf_sz)) { |
| return std::string("invalid buffer size"); |
| } |
| |
| dsn::je_dump_stats(type, static_cast<size_t>(buf_sz), stats); |
| return stats; |
| })); |
| } |
| #endif |
| |
| void replica_stub::register_ctrl_command() |
| { |
| /// In simple_kv test, three replica apps are created, which means that three replica_stubs are |
| /// initialized in simple_kv test. If we don't use std::call_once, these command are registered |
| /// for three times. And in command_manager, one same command is not allowed to be registered |
| /// more than twice times. That is why we use std::call_once here. Same situation in |
| /// failure_detector::register_ctrl_commands and nfs_client_impl::register_cli_commands |
| static std::once_flag flag; |
| std::call_once(flag, [&]() { |
| _cmds.emplace_back(::dsn::command_manager::instance().register_command( |
| {"replica.kill_partition"}, |
| "replica.kill_partition [app_id [partition_index]]", |
| "replica.kill_partition: kill partitions by (all, one app, one partition)", |
| [this](const std::vector<std::string> &args) { |
| dsn::gpid pid; |
| if (args.size() == 0) { |
| pid.set_app_id(-1); |
| pid.set_partition_index(-1); |
| } else if (args.size() == 1) { |
| pid.set_app_id(atoi(args[0].c_str())); |
| pid.set_partition_index(-1); |
| } else if (args.size() == 2) { |
| pid.set_app_id(atoi(args[0].c_str())); |
| pid.set_partition_index(atoi(args[1].c_str())); |
| } else { |
| return std::string(ERR_INVALID_PARAMETERS.to_string()); |
| } |
| dsn::error_code e = this->on_kill_replica(pid); |
| return std::string(e.to_string()); |
| })); |
| |
| _cmds.emplace_back(::dsn::command_manager::instance().register_command( |
| {"replica.deny-client"}, |
| "replica.deny-client <true|false>", |
| "replica.deny-client - control if deny client read & write request", |
| [this](const std::vector<std::string> &args) { |
| return remote_command_set_bool_flag(_deny_client, "deny-client", args); |
| })); |
| |
| _cmds.emplace_back(::dsn::command_manager::instance().register_command( |
| {"replica.verbose-client-log"}, |
| "replica.verbose-client-log <true|false>", |
| "replica.verbose-client-log - control if print verbose error log when reply read & " |
| "write request", |
| [this](const std::vector<std::string> &args) { |
| return remote_command_set_bool_flag( |
| _verbose_client_log, "verbose-client-log", args); |
| })); |
| |
| _cmds.emplace_back(::dsn::command_manager::instance().register_command( |
| {"replica.verbose-commit-log"}, |
| "replica.verbose-commit-log <true|false>", |
| "replica.verbose-commit-log - control if print verbose log when commit mutation", |
| [this](const std::vector<std::string> &args) { |
| return remote_command_set_bool_flag( |
| _verbose_commit_log, "verbose-commit-log", args); |
| })); |
| |
| _cmds.emplace_back(::dsn::command_manager::instance().register_command( |
| {"replica.trigger-checkpoint"}, |
| "replica.trigger-checkpoint [id1,id2,...] (where id is 'app_id' or " |
| "'app_id.partition_id')", |
| "replica.trigger-checkpoint - trigger replicas to do checkpoint", |
| [this](const std::vector<std::string> &args) { |
| return exec_command_on_replica(args, true, [this](const replica_ptr &rep) { |
| tasking::enqueue(LPC_PER_REPLICA_CHECKPOINT_TIMER, |
| rep->tracker(), |
| std::bind(&replica_stub::trigger_checkpoint, this, rep, true), |
| rep->get_gpid().thread_hash()); |
| return std::string("triggered"); |
| }); |
| })); |
| |
| _cmds.emplace_back(::dsn::command_manager::instance().register_command( |
| {"replica.query-compact"}, |
| "replica.query-compact [id1,id2,...] (where id is 'app_id' or 'app_id.partition_id')", |
| "replica.query-compact - query full compact status on the underlying storage engine", |
| [this](const std::vector<std::string> &args) { |
| return exec_command_on_replica(args, true, [](const replica_ptr &rep) { |
| return rep->query_manual_compact_state(); |
| }); |
| })); |
| |
| _cmds.emplace_back(::dsn::command_manager::instance().register_command( |
| {"replica.query-app-envs"}, |
| "replica.query-app-envs [id1,id2,...] (where id is 'app_id' or 'app_id.partition_id')", |
| "replica.query-app-envs - query app envs on the underlying storage engine", |
| [this](const std::vector<std::string> &args) { |
| return exec_command_on_replica(args, true, [](const replica_ptr &rep) { |
| std::map<std::string, std::string> kv_map; |
| rep->query_app_envs(kv_map); |
| return dsn::utils::kv_map_to_string(kv_map, ',', '='); |
| }); |
| })); |
| |
| #ifdef DSN_ENABLE_GPERF |
| _cmds.emplace_back(::dsn::command_manager::instance().register_command( |
| {"replica.release-tcmalloc-memory"}, |
| "replica.release-tcmalloc-memory <true|false>", |
| "replica.release-tcmalloc-memory - control if try to release tcmalloc memory", |
| [this](const std::vector<std::string> &args) { |
| return remote_command_set_bool_flag( |
| _release_tcmalloc_memory, "release-tcmalloc-memory", args); |
| })); |
| |
| _cmds.emplace_back(::dsn::command_manager::instance().register_command( |
| {"replica.get-tcmalloc-status"}, |
| "replica.get-tcmalloc-status - get status of tcmalloc", |
| "get status of tcmalloc", |
| [](const std::vector<std::string> &args) { |
| char buf[4096]; |
| MallocExtension::instance()->GetStats(buf, 4096); |
| return std::string(buf); |
| })); |
| |
| _cmds.emplace_back(::dsn::command_manager::instance().register_command( |
| {"replica.mem-release-max-reserved-percentage"}, |
| "replica.mem-release-max-reserved-percentage [num | DEFAULT]", |
| "control tcmalloc max reserved but not-used memory percentage", |
| [this](const std::vector<std::string> &args) { |
| std::string result("OK"); |
| if (args.empty()) { |
| // show current value |
| result = "mem-release-max-reserved-percentage = " + |
| std::to_string(_mem_release_max_reserved_mem_percentage); |
| return result; |
| } |
| if (args[0] == "DEFAULT") { |
| // set to default value |
| _mem_release_max_reserved_mem_percentage = |
| FLAGS_mem_release_max_reserved_mem_percentage; |
| return result; |
| } |
| int32_t percentage = 0; |
| if (!dsn::buf2int32(args[0], percentage) || percentage <= 0 || percentage > 100) { |
| result = std::string("ERR: invalid arguments"); |
| } else { |
| _mem_release_max_reserved_mem_percentage = percentage; |
| } |
| return result; |
| })); |
| |
| _cmds.emplace_back(::dsn::command_manager::instance().register_command( |
| {"replica.release-all-reserved-memory"}, |
| "replica.release-all-reserved-memory - release tcmalloc all reserved-not-used memory", |
| "release tcmalloc all reserverd not-used memory back to operating system", |
| [this](const std::vector<std::string> &args) { |
| auto release_bytes = gc_tcmalloc_memory(true); |
| return "OK, release_bytes=" + std::to_string(release_bytes); |
| })); |
| #elif defined(DSN_USE_JEMALLOC) |
| register_jemalloc_ctrl_command(); |
| #endif |
| // TODO(yingchun): use http |
| _cmds.emplace_back(::dsn::command_manager::instance().register_command( |
| {"replica.max-concurrent-bulk-load-downloading-count"}, |
| "replica.max-concurrent-bulk-load-downloading-count [num | DEFAULT]", |
| "control stub max_concurrent_bulk_load_downloading_count", |
| [this](const std::vector<std::string> &args) { |
| std::string result("OK"); |
| if (args.empty()) { |
| result = "max_concurrent_bulk_load_downloading_count=" + |
| std::to_string(_max_concurrent_bulk_load_downloading_count); |
| return result; |
| } |
| |
| if (args[0] == "DEFAULT") { |
| _max_concurrent_bulk_load_downloading_count = |
| _options.max_concurrent_bulk_load_downloading_count; |
| return result; |
| } |
| |
| int32_t count = 0; |
| if (!dsn::buf2int32(args[0], count) || count <= 0) { |
| result = std::string("ERR: invalid arguments"); |
| } else { |
| _max_concurrent_bulk_load_downloading_count = count; |
| } |
| return result; |
| })); |
| }); |
| } |
| |
| std::string |
| replica_stub::exec_command_on_replica(const std::vector<std::string> &args, |
| bool allow_empty_args, |
| std::function<std::string(const replica_ptr &rep)> func) |
| { |
| if (!allow_empty_args && args.empty()) { |
| return std::string("invalid arguments"); |
| } |
| |
| replicas rs; |
| { |
| zauto_read_lock l(_replicas_lock); |
| rs = _replicas; |
| } |
| |
| std::set<gpid> required_ids; |
| replicas choosed_rs; |
| if (!args.empty()) { |
| for (int i = 0; i < args.size(); i++) { |
| std::vector<std::string> arg_strs; |
| utils::split_args(args[i].c_str(), arg_strs, ','); |
| if (arg_strs.empty()) { |
| return std::string("invalid arguments"); |
| } |
| |
| for (const std::string &arg : arg_strs) { |
| if (arg.empty()) |
| continue; |
| gpid id; |
| int pid; |
| if (id.parse_from(arg.c_str())) { |
| // app_id.partition_index |
| required_ids.insert(id); |
| auto find = rs.find(id); |
| if (find != rs.end()) { |
| choosed_rs[id] = find->second; |
| } |
| } else if (sscanf(arg.c_str(), "%d", &pid) == 1) { |
| // app_id |
| for (auto kv : rs) { |
| id = kv.second->get_gpid(); |
| if (id.get_app_id() == pid) { |
| choosed_rs[id] = kv.second; |
| } |
| } |
| } else { |
| return std::string("invalid arguments"); |
| } |
| } |
| } |
| } else { |
| // all replicas |
| choosed_rs = rs; |
| } |
| |
| std::vector<task_ptr> tasks; |
| ::dsn::zlock results_lock; |
| std::map<gpid, std::pair<partition_status::type, std::string>> results; // id => status,result |
| for (auto &kv : choosed_rs) { |
| replica_ptr rep = kv.second; |
| task_ptr tsk = tasking::enqueue(LPC_EXEC_COMMAND_ON_REPLICA, |
| rep->tracker(), |
| [rep, &func, &results_lock, &results]() { |
| partition_status::type status = rep->status(); |
| if (status != partition_status::PS_PRIMARY && |
| status != partition_status::PS_SECONDARY) |
| return; |
| std::string result = func(rep); |
| ::dsn::zauto_lock l(results_lock); |
| auto &value = results[rep->get_gpid()]; |
| value.first = status; |
| value.second = result; |
| }, |
| rep->get_gpid().thread_hash()); |
| tasks.emplace_back(std::move(tsk)); |
| } |
| |
| for (auto &tsk : tasks) { |
| tsk->wait(); |
| } |
| |
| int processed = results.size(); |
| int not_found = 0; |
| for (auto &id : required_ids) { |
| if (results.find(id) == results.end()) { |
| auto &value = results[id]; |
| value.first = partition_status::PS_INVALID; |
| value.second = "not found"; |
| not_found++; |
| } |
| } |
| |
| std::stringstream query_state; |
| query_state << processed << " processed, " << not_found << " not found"; |
| for (auto &kv : results) { |
| query_state << "\n " << kv.first.to_string() << "@" << _primary_address_str; |
| if (kv.second.first != partition_status::PS_INVALID) |
| query_state << "@" << (kv.second.first == partition_status::PS_PRIMARY ? "P" : "S"); |
| query_state << " : " << kv.second.second; |
| } |
| |
| return query_state.str(); |
| } |
| |
| void replica_stub::close() |
| { |
| if (!_is_running) { |
| return; |
| } |
| |
| _tracker.cancel_outstanding_tasks(); |
| |
| // this replica may not be opened |
| // or is already closed by calling tool_app::stop_all_apps() |
| // in this case, just return |
| if (_cmds.empty()) { |
| return; |
| } |
| _cmds.clear(); |
| |
| if (_config_sync_timer_task != nullptr) { |
| _config_sync_timer_task->cancel(true); |
| _config_sync_timer_task = nullptr; |
| } |
| |
| if (_duplication_sync_timer != nullptr) { |
| _duplication_sync_timer->close(); |
| _duplication_sync_timer = nullptr; |
| } |
| |
| if (_config_query_task != nullptr) { |
| _config_query_task->cancel(true); |
| _config_query_task = nullptr; |
| } |
| _state = NS_Disconnected; |
| |
| if (_disk_stat_timer_task != nullptr) { |
| _disk_stat_timer_task->cancel(true); |
| _disk_stat_timer_task = nullptr; |
| } |
| |
| if (_gc_timer_task != nullptr) { |
| _gc_timer_task->cancel(true); |
| _gc_timer_task = nullptr; |
| } |
| |
| if (_mem_release_timer_task != nullptr) { |
| _mem_release_timer_task->cancel(true); |
| _mem_release_timer_task = nullptr; |
| } |
| |
| wait_closing_replicas_finished(); |
| |
| { |
| zauto_write_lock l(_replicas_lock); |
| |
| while (!_opening_replicas.empty()) { |
| task_ptr task = _opening_replicas.begin()->second; |
| _replicas_lock.unlock_write(); |
| |
| task->cancel(true); |
| |
| _counter_replicas_opening_count->decrement(); |
| _replicas_lock.lock_write(); |
| _opening_replicas.erase(_opening_replicas.begin()); |
| } |
| |
| while (!_replicas.empty()) { |
| _replicas.begin()->second->close(); |
| |
| _counter_replicas_count->decrement(); |
| _replicas.erase(_replicas.begin()); |
| } |
| } |
| _is_running = false; |
| } |
| |
| #ifdef DSN_ENABLE_GPERF |
| // Get tcmalloc numeric property (name is "prop") value. |
| // Return -1 if get property failed (property we used will be greater than zero) |
| // Properties can be found in 'gperftools/malloc_extension.h' |
| static int64_t get_tcmalloc_numeric_property(const char *prop) |
| { |
| size_t value; |
| if (!::MallocExtension::instance()->GetNumericProperty(prop, &value)) { |
| LOG_ERROR("Failed to get tcmalloc property {}", prop); |
| return -1; |
| } |
| return value; |
| } |
| |
| uint64_t replica_stub::gc_tcmalloc_memory(bool release_all) |
| { |
| auto tcmalloc_released_bytes = 0; |
| if (!_release_tcmalloc_memory) { |
| _is_releasing_memory.store(false); |
| _counter_tcmalloc_release_memory_size->set(tcmalloc_released_bytes); |
| return tcmalloc_released_bytes; |
| } |
| |
| if (_is_releasing_memory.load()) { |
| LOG_WARNING("This node is releasing memory..."); |
| return tcmalloc_released_bytes; |
| } |
| |
| _is_releasing_memory.store(true); |
| int64_t total_allocated_bytes = |
| get_tcmalloc_numeric_property("generic.current_allocated_bytes"); |
| int64_t reserved_bytes = get_tcmalloc_numeric_property("tcmalloc.pageheap_free_bytes"); |
| if (total_allocated_bytes == -1 || reserved_bytes == -1) { |
| return tcmalloc_released_bytes; |
| } |
| |
| int64_t max_reserved_bytes = |
| release_all ? 0 |
| : (total_allocated_bytes * _mem_release_max_reserved_mem_percentage / 100.0); |
| if (reserved_bytes > max_reserved_bytes) { |
| int64_t release_bytes = reserved_bytes - max_reserved_bytes; |
| tcmalloc_released_bytes = release_bytes; |
| LOG_INFO("Memory release started, almost {} bytes will be released", release_bytes); |
| while (release_bytes > 0) { |
| // tcmalloc releasing memory will lock page heap, release 1MB at a time to avoid locking |
| // page heap for long time |
| ::MallocExtension::instance()->ReleaseToSystem(1024 * 1024); |
| release_bytes -= 1024 * 1024; |
| } |
| } |
| _counter_tcmalloc_release_memory_size->set(tcmalloc_released_bytes); |
| _is_releasing_memory.store(false); |
| return tcmalloc_released_bytes; |
| } |
| #endif |
| |
| // |
| // partition split |
| // |
| void replica_stub::create_child_replica(rpc_address primary_address, |
| app_info app, |
| ballot init_ballot, |
| gpid child_gpid, |
| gpid parent_gpid, |
| const std::string &parent_dir) |
| { |
| replica_ptr child_replica = create_child_replica_if_not_found(child_gpid, &app, parent_dir); |
| if (child_replica != nullptr) { |
| LOG_INFO("app({}), create child replica ({}) succeed", app.app_name, child_gpid); |
| tasking::enqueue(LPC_PARTITION_SPLIT, |
| child_replica->tracker(), |
| std::bind(&replica_split_manager::child_init_replica, |
| child_replica->get_split_manager(), |
| parent_gpid, |
| primary_address, |
| init_ballot), |
| child_gpid.thread_hash()); |
| } else { |
| LOG_WARNING("failed to create child replica ({}), ignore it and wait next run", child_gpid); |
| split_replica_error_handler( |
| parent_gpid, |
| std::bind(&replica_split_manager::parent_cleanup_split_context, std::placeholders::_1)); |
| } |
| } |
| |
| replica_ptr replica_stub::create_child_replica_if_not_found(gpid child_pid, |
| app_info *app, |
| const std::string &parent_dir) |
| { |
| FAIL_POINT_INJECT_F("replica_stub_create_child_replica_if_not_found", |
| [=](dsn::string_view) -> replica_ptr { |
| replica *rep = new replica(this, child_pid, *app, "./", false); |
| rep->_config.status = partition_status::PS_INACTIVE; |
| _replicas.insert(replicas::value_type(child_pid, rep)); |
| LOG_INFO("mock create_child_replica_if_not_found succeed"); |
| return rep; |
| }); |
| |
| zauto_write_lock l(_replicas_lock); |
| auto it = _replicas.find(child_pid); |
| if (it != _replicas.end()) { |
| return it->second; |
| } else { |
| if (_opening_replicas.find(child_pid) != _opening_replicas.end()) { |
| LOG_WARNING("failed create child replica({}) because it is under open", child_pid); |
| return nullptr; |
| } else if (_closing_replicas.find(child_pid) != _closing_replicas.end()) { |
| LOG_WARNING("failed create child replica({}) because it is under close", child_pid); |
| return nullptr; |
| } else { |
| replica *rep = new_replica(child_pid, *app, false, false, parent_dir); |
| if (rep != nullptr) { |
| auto pr = _replicas.insert(replicas::value_type(child_pid, rep)); |
| CHECK(pr.second, "child replica {} has been existed", rep->name()); |
| _counter_replicas_count->increment(); |
| _closed_replicas.erase(child_pid); |
| } |
| return rep; |
| } |
| } |
| } |
| |
| // ThreadPool: THREAD_POOL_REPLICATION |
| void replica_stub::split_replica_error_handler(gpid pid, local_execution handler) |
| { |
| split_replica_exec(LPC_PARTITION_SPLIT_ERROR, pid, handler); |
| } |
| |
| // ThreadPool: THREAD_POOL_REPLICATION |
| dsn::error_code |
| replica_stub::split_replica_exec(dsn::task_code code, gpid pid, local_execution handler) |
| { |
| FAIL_POINT_INJECT_F("replica_stub_split_replica_exec", [](dsn::string_view) { return ERR_OK; }); |
| replica_ptr replica = pid.get_app_id() == 0 ? nullptr : get_replica(pid); |
| if (replica && handler) { |
| tasking::enqueue(code, |
| replica.get()->tracker(), |
| [handler, replica]() { handler(replica->get_split_manager()); }, |
| pid.thread_hash()); |
| return ERR_OK; |
| } |
| LOG_WARNING("replica({}) is invalid", pid); |
| return ERR_OBJECT_NOT_FOUND; |
| } |
| |
| // ThreadPool: THREAD_POOL_REPLICATION |
| void replica_stub::on_notify_primary_split_catch_up(notify_catch_up_rpc rpc) |
| { |
| const notify_catch_up_request &request = rpc.request(); |
| notify_cacth_up_response &response = rpc.response(); |
| replica_ptr replica = get_replica(request.parent_gpid); |
| if (replica != nullptr) { |
| replica->get_split_manager()->parent_handle_child_catch_up(request, response); |
| } else { |
| response.err = ERR_OBJECT_NOT_FOUND; |
| } |
| } |
| |
| // ThreadPool: THREAD_POOL_REPLICATION |
| void replica_stub::on_update_child_group_partition_count(update_child_group_partition_count_rpc rpc) |
| { |
| const auto &request = rpc.request(); |
| auto &response = rpc.response(); |
| replica_ptr replica = get_replica(request.child_pid); |
| if (replica != nullptr) { |
| replica->get_split_manager()->on_update_child_group_partition_count(request, response); |
| } else { |
| response.err = ERR_OBJECT_NOT_FOUND; |
| } |
| } |
| |
| void replica_stub::update_disk_holding_replicas() |
| { |
| for (const auto &dn : _fs_manager.get_dir_nodes()) { |
| dn->holding_primary_replicas.clear(); |
| dn->holding_secondary_replicas.clear(); |
| for (const auto &holding_replicas : dn->holding_replicas) { |
| const auto &pids = holding_replicas.second; |
| for (const auto &pid : pids) { |
| const auto rep = get_replica(pid); |
| if (rep == nullptr) { |
| continue; |
| } |
| if (rep->status() == partition_status::PS_PRIMARY) { |
| dn->holding_primary_replicas[holding_replicas.first].emplace(pid); |
| } else if (rep->status() == partition_status::PS_SECONDARY) { |
| dn->holding_secondary_replicas[holding_replicas.first].emplace(pid); |
| } |
| } |
| } |
| } |
| } |
| |
| void replica_stub::on_bulk_load(bulk_load_rpc rpc) |
| { |
| const bulk_load_request &request = rpc.request(); |
| bulk_load_response &response = rpc.response(); |
| |
| LOG_INFO("[{}@{}]: receive bulk load request", request.pid, _primary_address_str); |
| replica_ptr rep = get_replica(request.pid); |
| if (rep != nullptr) { |
| rep->get_bulk_loader()->on_bulk_load(request, response); |
| } else { |
| LOG_ERROR("replica({}) is not existed", request.pid); |
| response.err = ERR_OBJECT_NOT_FOUND; |
| } |
| } |
| |
| void replica_stub::on_group_bulk_load(group_bulk_load_rpc rpc) |
| { |
| const group_bulk_load_request &request = rpc.request(); |
| group_bulk_load_response &response = rpc.response(); |
| |
| LOG_INFO("[{}@{}]: received group bulk load request, primary = {}, ballot = {}, " |
| "meta_bulk_load_status = {}", |
| request.config.pid, |
| _primary_address_str, |
| request.config.primary.to_string(), |
| request.config.ballot, |
| enum_to_string(request.meta_bulk_load_status)); |
| |
| replica_ptr rep = get_replica(request.config.pid); |
| if (rep != nullptr) { |
| rep->get_bulk_loader()->on_group_bulk_load(request, response); |
| } else { |
| LOG_ERROR("replica({}) is not existed", request.config.pid); |
| response.err = ERR_OBJECT_NOT_FOUND; |
| } |
| } |
| |
| void replica_stub::on_detect_hotkey(detect_hotkey_rpc rpc) |
| { |
| const auto &request = rpc.request(); |
| auto &response = rpc.response(); |
| |
| LOG_INFO("[{}@{}]: received detect hotkey request, hotkey_type = {}, detect_action = {}", |
| request.pid, |
| _primary_address_str, |
| enum_to_string(request.type), |
| enum_to_string(request.action)); |
| |
| replica_ptr rep = get_replica(request.pid); |
| if (rep != nullptr) { |
| rep->on_detect_hotkey(request, response); |
| } else { |
| response.err = ERR_OBJECT_NOT_FOUND; |
| response.err_hint = fmt::format("not find the replica {} \n", request.pid); |
| } |
| } |
| |
| void replica_stub::query_app_data_version( |
| int32_t app_id, /*pidx => data_version*/ std::unordered_map<int32_t, uint32_t> &version_map) |
| { |
| zauto_read_lock l(_replicas_lock); |
| for (const auto &kv : _replicas) { |
| if (kv.first.get_app_id() == app_id) { |
| replica_ptr rep = kv.second; |
| if (rep != nullptr) { |
| uint32_t data_version = rep->query_data_version(); |
| version_map[kv.first.get_partition_index()] = data_version; |
| } |
| } |
| } |
| } |
| |
| void replica_stub::query_app_manual_compact_status( |
| int32_t app_id, std::unordered_map<gpid, manual_compaction_status::type> &status) |
| { |
| zauto_read_lock l(_replicas_lock); |
| for (auto it = _replicas.begin(); it != _replicas.end(); ++it) { |
| if (it->first.get_app_id() == app_id) { |
| status[it->first] = it->second->get_manual_compact_status(); |
| } |
| } |
| } |
| |
| void replica_stub::update_config(const std::string &name) |
| { |
| // The new value has been validated and FLAGS_* has been updated, it's safety to use it |
| // directly. |
| UPDATE_CONFIG(_config_sync_timer_task->update_interval, config_sync_interval_ms, name); |
| } |
| |
| void replica_stub::wait_closing_replicas_finished() |
| { |
| zauto_write_lock l(_replicas_lock); |
| while (!_closing_replicas.empty()) { |
| auto task = std::get<0>(_closing_replicas.begin()->second); |
| auto first_gpid = _closing_replicas.begin()->first; |
| |
| // TODO(yingchun): improve the code |
| _replicas_lock.unlock_write(); |
| task->wait(); |
| _replicas_lock.lock_write(); |
| |
| // task will automatically remove this replica from '_closing_replicas' |
| if (!_closing_replicas.empty()) { |
| CHECK_NE_MSG(first_gpid, |
| _closing_replicas.begin()->first, |
| "this replica '{}' should has been removed", |
| first_gpid); |
| } |
| } |
| } |
| |
| } // namespace replication |
| } // namespace dsn |