| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #include <cstdlib> |
| #include <string> |
| #include <vector> |
| #include <climits> |
| #include <map> |
| #include <memory> |
| #include <boost/lexical_cast.hpp> |
| |
| #include <dsn/service_api_c.h> |
| #include <dsn/dist/replication/replication_ddl_client.h> |
| #include <dsn/utility/rand.h> |
| |
| #include <pegasus/client.h> |
| #include <gtest/gtest.h> |
| |
| #include "base/pegasus_const.h" |
| #include "global_env.h" |
| |
| using namespace dsn::replication; |
| using namespace pegasus; |
| |
| class recovery_test : public testing::Test |
| { |
| protected: |
| virtual void SetUp() |
| { |
| // THREAD_POOL_META_SERVER worker count should be greater than 1 |
| // This function test update 'distributed_lock_service_type' to |
| // 'distributed_lock_service_simple', which executes in threadpool THREAD_POOL_META_SERVER |
| // As a result, failure detection lock executes in this pool |
| // if worker count = 1, it will lead to ERR_TIMEOUT when execute 'ddl_client->do_recovery' |
| |
| // 1. restart onebox, modify the config |
| chdir(global_env::instance()._pegasus_root.c_str()); |
| system("./run.sh clear_onebox"); |
| |
| system("cp src/server/config.min.ini config-server-test-recovery.ini"); |
| system("sed -i \"/^\\s*meta_state_service_type/c meta_state_service_type = " |
| "meta_state_service_simple\" config-server-test-recovery.ini"); |
| system("sed -i \"/^\\s*distributed_lock_service_type/c distributed_lock_service_type = " |
| "distributed_lock_service_simple\" config-server-test-recovery.ini"); |
| system("sed -i \"/^\\s*server_list/c server_list = @LOCAL_IP@:34601\" " |
| "config-server-test-recovery.ini"); |
| system("sed -i \"/^\\s*perf_counter_enable_logging/c perf_counter_enable_logging = false\" " |
| "config-server-test-recovery.ini"); |
| |
| system("./run.sh start_onebox -m 1 -r 3 --config_path config-server-test-recovery.ini"); |
| std::cout << "sleep for a while to wait the new onebox start" << std::endl; |
| std::this_thread::sleep_for(std::chrono::seconds(3)); |
| |
| chdir(global_env::instance()._working_dir.c_str()); |
| |
| // 2. initialize the clients |
| std::vector<dsn::rpc_address> meta_list; |
| replica_helper::load_meta_servers( |
| meta_list, PEGASUS_CLUSTER_SECTION_NAME.c_str(), "single_master_cluster"); |
| |
| ddl_client = std::make_shared<replication_ddl_client>(meta_list); |
| pg_client = pegasus::pegasus_client_factory::get_client("single_master_cluster", |
| table_name.c_str()); |
| |
| // 3. write some data to the app |
| dsn::error_code err; |
| // first create the app |
| err = ddl_client->create_app(table_name, "pegasus", default_partitions, 3, {}, false); |
| ASSERT_EQ(dsn::ERR_OK, err); |
| |
| // then write keys |
| std::cerr << "write " << set_count << " keys" << std::endl; |
| for (int i = 0; i < set_count; ++i) { |
| std::string hash_key = key_prefix + boost::lexical_cast<std::string>(i); |
| std::string sort_key = hash_key; |
| std::string value = value_prefix + boost::lexical_cast<std::string>(i); |
| |
| pegasus::pegasus_client::internal_info info; |
| int ans = pg_client->set(hash_key, sort_key, value, 5000, 0, &info); |
| ASSERT_EQ(0, ans); |
| ASSERT_TRUE(info.partition_index < default_partitions); |
| } |
| } |
| |
| virtual void TearDown() |
| { |
| chdir(global_env::instance()._pegasus_root.c_str()); |
| system("./run.sh clear_onebox"); |
| system("./run.sh start_onebox -w"); |
| chdir(global_env::instance()._working_dir.c_str()); |
| } |
| |
| public: |
| std::shared_ptr<replication_ddl_client> ddl_client; |
| pegasus::pegasus_client *pg_client; |
| |
| public: |
| std::vector<dsn::rpc_address> get_rpc_address_list(const std::vector<int> ports) |
| { |
| std::vector<dsn::rpc_address> result; |
| result.reserve(ports.size()); |
| for (const int &p : ports) { |
| dsn::rpc_address address(global_env::instance()._host_ip.c_str(), p); |
| result.push_back(address); |
| } |
| return result; |
| } |
| |
| void stop_replica(int id) |
| { |
| char command[512]; |
| snprintf(command, |
| 512, |
| "cd %s && ./run.sh stop_onebox_instance -r %d", |
| global_env::instance()._pegasus_root.c_str(), |
| id); |
| system(command); |
| } |
| |
| void stop_meta(int id) |
| { |
| char command[512]; |
| snprintf(command, |
| 512, |
| "cd %s && ./run.sh stop_onebox_instance -m %d", |
| global_env::instance()._pegasus_root.c_str(), |
| id); |
| system(command); |
| } |
| |
| void start_meta(int id) |
| { |
| char command[512]; |
| snprintf(command, |
| 512, |
| "cd %s && ./run.sh start_onebox_instance -m %d", |
| global_env::instance()._pegasus_root.c_str(), |
| id); |
| system(command); |
| } |
| |
| void start_replica(int id) |
| { |
| char command[512]; |
| snprintf(command, |
| 512, |
| "cd %s && ./run.sh start_onebox_instance -r %d", |
| global_env::instance()._pegasus_root.c_str(), |
| id); |
| system(command); |
| } |
| |
| void clear_remote_storage() |
| { |
| char command[512]; |
| snprintf(command, |
| 512, |
| "cd %s && rm -rf onebox/meta1/data/meta/meta_state_service.log", |
| global_env::instance()._pegasus_root.c_str()); |
| system(command); |
| } |
| |
| void config_meta_to_do_cold_recovery() |
| { |
| char command[512]; |
| snprintf( |
| command, |
| 512, |
| "cd %s && sed -i \"/^\\s*recover_from_replica_server/c recover_from_replica_server = " |
| "true\" onebox/meta1/config.ini", |
| global_env::instance()._pegasus_root.c_str()); |
| system(command); |
| } |
| |
| void delete_replica(int replica_id, int app_id, int partition_id) |
| { |
| char command[512]; |
| snprintf(command, |
| 512, |
| "cd %s/onebox/replica%d/data/replica/reps && rm -rf %d.%d.pegasus", |
| global_env::instance()._pegasus_root.c_str(), |
| replica_id, |
| app_id, |
| partition_id); |
| std::cout << command << std::endl; |
| system(command); |
| } |
| |
| void delete_replicas_for_app_id(int replica_id, int app_id) |
| { |
| char command[512]; |
| snprintf(command, |
| 512, |
| "cd %s/onebox/replica%d/data/replica/reps && rm -rf %d.*.pegasus", |
| global_env::instance()._pegasus_root.c_str(), |
| replica_id, |
| app_id); |
| std::cout << command << std::endl; |
| system(command); |
| } |
| |
| // 1. stop replicas |
| // 2. clear the remote storage and set meta to recover mode |
| void prepare_recovery() |
| { |
| // then stop all jobs |
| stop_meta(1); |
| for (int i = 1; i <= 3; ++i) { |
| stop_replica(i); |
| } |
| |
| clear_remote_storage(); |
| config_meta_to_do_cold_recovery(); |
| // sleep some time, in case that the socket is time-wait |
| std::cout << "sleep for a while to wait the socket to destroy" << std::endl; |
| std::this_thread::sleep_for(std::chrono::seconds(10)); |
| } |
| |
| void verify_data(int count) |
| { |
| // then check to read all keys |
| for (int i = 0; i < count; ++i) { |
| std::string hash_key = key_prefix + boost::lexical_cast<std::string>(i); |
| std::string sort_key = hash_key; |
| std::string exp_value = value_prefix + boost::lexical_cast<std::string>(i); |
| |
| std::string act_value; |
| int ans = pg_client->get(hash_key, sort_key, act_value); |
| ASSERT_EQ(0, ans); |
| ASSERT_EQ(exp_value, act_value); |
| } |
| } |
| |
| static const std::string table_name; |
| static const std::string key_prefix; |
| static const std::string value_prefix; |
| static const int default_partitions = 4; |
| static const int set_count = 2048; |
| }; |
| |
| const std::string recovery_test::table_name = "test_table"; |
| const std::string recovery_test::key_prefix = "hello_key"; |
| const std::string recovery_test::value_prefix = "world_key"; |
| |
| TEST_F(recovery_test, recovery) |
| { |
| dsn::error_code err; |
| |
| // first test the basic recovery |
| std::cout << ">>>>> test basic recovery <<<<<" << std::endl; |
| { |
| prepare_recovery(); |
| // start all jobs again |
| for (int i = 1; i <= 3; ++i) { |
| start_replica(i); |
| } |
| std::cout << "sleep for a while to wait the replica to start" << std::endl; |
| std::this_thread::sleep_for(std::chrono::seconds(10)); |
| start_meta(1); |
| |
| std::cout << "sleep for a while to wait the meta to come to alive" << std::endl; |
| std::this_thread::sleep_for(std::chrono::seconds(5)); |
| |
| // then do recovery |
| auto nodes = get_rpc_address_list({34801, 34802, 34803}); |
| err = ddl_client->do_recovery(nodes, 30, false, false, std::string()); |
| ASSERT_EQ(dsn::ERR_OK, err); |
| |
| // send another recovery command |
| err = ddl_client->do_recovery(nodes, 30, false, false, std::string()); |
| ASSERT_EQ(dsn::ERR_SERVICE_ALREADY_RUNNING, err); |
| |
| // then wait the apps to ready |
| err = ddl_client->create_app(table_name, "pegasus", default_partitions, 3, {}, false); |
| ASSERT_EQ(dsn::ERR_OK, err); |
| |
| verify_data(set_count); |
| } |
| |
| // recover from subset of all nodes |
| std::cout << ">>>>> test recovery from subset of all nodes <<<<<" << std::endl; |
| { |
| prepare_recovery(); |
| for (int i = 1; i <= 3; ++i) |
| start_replica(i); |
| std::cout << "sleep for a while to wait the replica to start" << std::endl; |
| std::this_thread::sleep_for(std::chrono::seconds(10)); |
| start_meta(1); |
| |
| std::cout << "sleep for a while to wait the meta to come to alive" << std::endl; |
| std::this_thread::sleep_for(std::chrono::seconds(5)); |
| |
| // recovery only from 1 & 2 |
| std::vector<dsn::rpc_address> nodes = get_rpc_address_list({34801, 34802}); |
| ddl_client->do_recovery(nodes, 30, false, false, std::string()); |
| ASSERT_EQ(dsn::ERR_OK, err); |
| |
| // then wait the app to ready |
| err = ddl_client->create_app(table_name, "pegasus", default_partitions, 3, {}, false); |
| ASSERT_EQ(dsn::ERR_OK, err); |
| |
| verify_data(set_count); |
| } |
| |
| // recovery from whole, but some partitions has been removed |
| std::cout << ">>>>> test recovery, some partitions have been lost <<<<<" << std::endl; |
| { |
| prepare_recovery(); |
| for (int i = 0; i < default_partitions; ++i) { |
| int replica_id = dsn::rand::next_u32(1, 3); |
| delete_replica(replica_id, 2, i); |
| } |
| |
| // start all jobs again |
| for (int i = 1; i <= 3; ++i) { |
| start_replica(i); |
| } |
| std::cout << "sleep for a while to wait the replica to start" << std::endl; |
| std::this_thread::sleep_for(std::chrono::seconds(10)); |
| start_meta(1); |
| |
| std::cout << "sleep for a while to wait the meta to come to alive" << std::endl; |
| std::this_thread::sleep_for(std::chrono::seconds(5)); |
| |
| // then do recovery |
| auto nodes = get_rpc_address_list({34801, 34802, 34803}); |
| err = ddl_client->do_recovery(nodes, 30, false, false, std::string()); |
| ASSERT_EQ(dsn::ERR_OK, err); |
| |
| // then wait the apps to ready |
| err = ddl_client->create_app(table_name, "pegasus", default_partitions, 3, {}, false); |
| ASSERT_EQ(dsn::ERR_OK, err); |
| |
| verify_data(set_count); |
| } |
| |
| // some apps has been totally removed |
| std::cout << ">>>>> test recovery, app 1 is removed <<<<<" << std::endl; |
| { |
| prepare_recovery(); |
| for (int i = 1; i < 4; ++i) { |
| delete_replicas_for_app_id(i, 1); |
| } |
| |
| // start all jobs again |
| for (int i = 1; i <= 3; ++i) { |
| start_replica(i); |
| } |
| std::cout << "sleep for a while to wait the replica to start" << std::endl; |
| std::this_thread::sleep_for(std::chrono::seconds(10)); |
| start_meta(1); |
| |
| std::cout << "sleep for a while to wait the meta to come to alive" << std::endl; |
| std::this_thread::sleep_for(std::chrono::seconds(5)); |
| |
| // then do recovery |
| auto nodes = get_rpc_address_list({34801, 34802, 34803}); |
| err = ddl_client->do_recovery(nodes, 30, false, false, std::string()); |
| ASSERT_EQ(dsn::ERR_OK, err); |
| |
| // then wait the apps to ready |
| err = ddl_client->create_app(table_name, "pegasus", default_partitions, 3, {}, false); |
| ASSERT_EQ(dsn::ERR_OK, err); |
| |
| verify_data(set_count); |
| } |
| } |