blob: ccd83ee7443fdb11a0f32f54f778261e114dff8f [file] [log] [blame]
/*
* The MIT License (MIT)
*
* Copyright (c) 2015 Microsoft Corporation
*
* -=- Robust Distributed System Nucleus (rDSN) -=-
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#include <boost/lexical_cast.hpp>
#include <algorithm>
#include <cstdint>
#include <fstream> // IWYU pragma: keep
#include <iostream>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "common/gpid.h"
#include "common/replication.codes.h"
#include "dsn.layer2_types.h"
#include "gtest/gtest.h"
#include "meta/meta_data.h"
#include "meta/meta_service.h"
#include "meta/meta_state_service.h"
#include "meta/server_state.h"
#include "meta/test/misc/misc.h"
#include "meta_admin_types.h"
#include "meta_service_test_app.h"
#include "runtime/rpc/rpc_address.h"
#include "runtime/task/task.h"
#include "utils/autoref_ptr.h"
#include "utils/error_code.h"
#include "utils/flags.h"
#include "utils/strings.h"
#include "utils/utils.h"
namespace dsn {
namespace replication {
class meta_options;
DSN_DECLARE_string(cluster_root);
DSN_DECLARE_string(meta_state_service_type);
static void random_assign_partition_config(std::shared_ptr<app_state> &app,
const std::vector<dsn::rpc_address> &server_list,
int max_replica_count)
{
auto get_server = [&server_list](int indice) {
if (indice % 2 != 0)
return dsn::rpc_address();
return server_list[indice / 2];
};
int max_servers = (server_list.size() - 1) * 2 - 1;
for (dsn::partition_configuration &pc : app->partitions) {
int start = 0;
std::vector<int> indices;
for (int i = 0; i < max_replica_count && start <= max_servers; ++i) {
indices.push_back(random32(start, max_servers));
start = indices.back() + 1;
}
pc.primary = get_server(indices[0]);
for (int i = 1; i < indices.size(); ++i) {
dsn::rpc_address addr = get_server(indices[i]);
if (!addr.is_invalid())
pc.secondaries.push_back(addr);
}
pc.last_drops = {server_list.back()};
}
}
static void file_data_compare(const char *fname1, const char *fname2)
{
static const int length = 4096;
std::shared_ptr<char> buffer(dsn::utils::make_shared_array<char>(length * 2));
char *buf1 = buffer.get(), *buf2 = buffer.get() + length;
std::ifstream ifile1(fname1, std::ios::in | std::ios::binary);
std::ifstream ifile2(fname2, std::ios::in | std::ios::binary);
auto file_length = [](std::ifstream &is) {
is.seekg(0, is.end);
int result = is.tellg();
is.seekg(0, is.beg);
return result;
};
int l = file_length(ifile1);
ASSERT_EQ(l, file_length(ifile2));
for (int i = 0; i < l; i += length) {
int up_to_bytes = length < (l - i) ? length : (l - i);
ifile1.read(buf1, up_to_bytes);
ifile2.read(buf2, up_to_bytes);
ASSERT_TRUE(utils::mequals(buf1, buf2, up_to_bytes));
}
}
void meta_service_test_app::state_sync_test()
{
int apps_count = 15;
int drop_ratio = 5;
std::vector<dsn::rpc_address> server_list;
std::vector<int> drop_set;
generate_node_list(server_list, 10, 10);
std::shared_ptr<meta_service> meta_svc = std::make_shared<meta_service>();
meta_service *svc = meta_svc.get();
meta_options &opt = svc->_meta_opts;
FLAGS_cluster_root = "/meta_test";
FLAGS_meta_state_service_type = "meta_state_service_simple";
svc->remote_storage_initialize();
std::string apps_root = "/meta_test/apps";
std::shared_ptr<server_state> ss1 = svc->_state;
// create apss randomly, and sync it to meta state service simple
std::cerr << "testing create apps and sync to remote storage" << std::endl;
{
server_state *ss = ss1.get();
ss->initialize(svc, apps_root);
drop_set.clear();
for (int i = 1; i <= apps_count; ++i) {
dsn::app_info info;
info.is_stateful = true;
info.app_id = i;
info.app_type = "simple_kv";
info.app_name = "test_app" + boost::lexical_cast<std::string>(i);
info.max_replica_count = 3;
info.partition_count = random32(100, 10000);
info.status = dsn::app_status::AS_CREATING;
std::shared_ptr<app_state> app = app_state::create(info);
ss->_all_apps.emplace(app->app_id, app);
if (i < apps_count && random32(1, apps_count) <= drop_ratio) {
app->status = dsn::app_status::AS_DROPPING;
drop_set.push_back(i);
app->app_name = "test_app" + boost::lexical_cast<std::string>(apps_count);
}
}
for (int i = 1; i <= apps_count; ++i) {
std::shared_ptr<app_state> app = ss->get_app(i);
random_assign_partition_config(app, server_list, 3);
if (app->status == dsn::app_status::AS_DROPPING) {
for (int j = 0; j < app->partition_count; ++j) {
app->partitions[j].partition_flags = pc_flags::dropped;
}
}
}
dsn::error_code ec = ss->sync_apps_to_remote_storage();
ASSERT_EQ(ec, dsn::ERR_OK);
ss->spin_wait_staging();
}
// then we sync from meta_state_service_simple, and dump to local file
std::cerr << "testing sync from remote storage and dump to local file" << std::endl;
{
std::shared_ptr<server_state> ss2 = std::make_shared<server_state>();
ss2->initialize(svc, apps_root);
dsn::error_code ec = ss2->sync_apps_from_remote_storage();
ASSERT_EQ(ec, dsn::ERR_OK);
for (int i = 1; i <= apps_count; ++i) {
std::shared_ptr<app_state> app = ss2->get_app(i);
for (int j = 0; j < app->partition_count; ++j) {
config_context &cc = app->helpers->contexts[j];
ASSERT_EQ(1, cc.dropped.size());
ASSERT_NE(cc.dropped.end(), cc.find_from_dropped(server_list.back()));
}
}
ec = ss2->dump_from_remote_storage("meta_state.dump1", false);
ASSERT_EQ(ec, dsn::ERR_OK);
}
// dump another way
std::cerr << "testing directly dump to local file" << std::endl;
{
std::shared_ptr<server_state> ss2 = std::make_shared<server_state>();
ss2->initialize(svc, apps_root);
dsn::error_code ec = ss2->dump_from_remote_storage("meta_state.dump2", true);
ASSERT_EQ(ec, dsn::ERR_OK);
file_data_compare("meta_state.dump1", "meta_state.dump2");
}
FLAGS_meta_state_service_type = "meta_state_service_zookeeper";
svc->remote_storage_initialize();
// first clean up
std::cerr << "start to clean up zookeeper storage" << std::endl;
{
dsn::error_code ec;
dsn::dist::meta_state_service *storage = svc->get_remote_storage();
storage
->delete_node(apps_root,
true,
LPC_META_CALLBACK,
[&ec](dsn::error_code error) { ec = error; },
nullptr)
->wait();
ASSERT_TRUE(dsn::ERR_OK == ec || dsn::ERR_OBJECT_NOT_FOUND == ec);
}
std::cerr << "test sync to zookeeper's remote storage" << std::endl;
// restore from the local file, and restore to zookeeper
{
std::shared_ptr<server_state> ss2 = std::make_shared<server_state>();
ss2->initialize(svc, apps_root);
dsn::error_code ec = ss2->restore_from_local_storage("meta_state.dump2");
ASSERT_EQ(ec, dsn::ERR_OK);
}
// then sync from zookeeper
std::cerr << "test sync from zookeeper's storage" << std::endl;
{
std::shared_ptr<server_state> ss2 = std::make_shared<server_state>();
ss2->initialize(svc, apps_root);
dsn::error_code ec = ss2->initialize_data_structure();
ASSERT_EQ(ec, dsn::ERR_OK);
app_mapper_compare(ss1->_all_apps, ss2->_all_apps);
ASSERT_EQ(ss1->_exist_apps.size(), ss2->_exist_apps.size());
for (const auto &iter : ss1->_exist_apps) {
ASSERT_TRUE(ss2->_exist_apps.find(iter.first) != ss2->_exist_apps.end());
}
ASSERT_EQ(ss1->_table_metric_entities, ss2->_table_metric_entities);
// then we dump the content to local file with binary format
std::cerr << "test dump to local file from zookeeper's storage" << std::endl;
ec = ss2->dump_from_remote_storage("meta_state.dump3", false);
ASSERT_EQ(ec, dsn::ERR_OK);
}
// then we restore from local storage and restore to remote
{
std::shared_ptr<server_state> ss2 = std::make_shared<server_state>();
ss2->initialize(svc, apps_root);
dsn::error_code ec = ss2->restore_from_local_storage("meta_state.dump3");
ASSERT_EQ(ec, dsn::ERR_OK);
app_mapper_compare(ss1->_all_apps, ss2->_all_apps);
ASSERT_TRUE(ss1->_exist_apps.size() == ss2->_exist_apps.size());
for (const auto &iter : ss1->_exist_apps) {
ASSERT_TRUE(ss2->_exist_apps.find(iter.first) != ss2->_exist_apps.end());
}
ASSERT_EQ(ss1->_table_metric_entities, ss2->_table_metric_entities);
ss2->initialize_node_state();
// then let's test the query configuration calls
// 1.1. normal gpid
dsn::gpid gpid = {15, 0};
dsn::partition_configuration pc;
ASSERT_TRUE(ss2->query_configuration_by_gpid(gpid, pc));
ASSERT_EQ(ss1->_all_apps[15]->partitions[0], pc);
// 1.2 dropped app
if (!drop_set.empty()) {
gpid.set_app_id(drop_set[0]);
ASSERT_FALSE(ss2->query_configuration_by_gpid(gpid, pc));
}
// 2.1 query configuration by index
dsn::query_cfg_request req;
dsn::query_cfg_response resp;
req.app_name = "test_app15";
req.partition_indices = {-1, 1, 2, 3, 0x7fffffff};
std::shared_ptr<app_state> app_created = ss1->get_app(15);
ss2->query_configuration_by_index(req, resp);
ASSERT_EQ(dsn::ERR_OK, resp.err);
ASSERT_EQ(15, resp.app_id);
ASSERT_EQ(app_created->partition_count, resp.partition_count);
ASSERT_EQ(resp.partitions.size(), 3);
for (int i = 1; i <= 3; ++i)
ASSERT_EQ(resp.partitions[i - 1], app_created->partitions[i]);
// 2.2 no exist app
req.app_name = "make_no_sense";
ss2->query_configuration_by_index(req, resp);
ASSERT_EQ(dsn::ERR_OBJECT_NOT_FOUND, resp.err);
// 2.3 app is dropping/creating/recalling
std::shared_ptr<app_state> app = ss2->get_app(15);
req.app_name = app->app_name;
ss2->query_configuration_by_index(req, resp);
ASSERT_EQ(dsn::ERR_OK, resp.err);
app->status = dsn::app_status::AS_DROPPING;
ss2->query_configuration_by_index(req, resp);
ASSERT_EQ(dsn::ERR_BUSY_DROPPING, resp.err);
app->status = dsn::app_status::AS_RECALLING;
ss2->query_configuration_by_index(req, resp);
ASSERT_EQ(dsn::ERR_BUSY_CREATING, resp.err);
app->status = dsn::app_status::AS_CREATING;
ss2->query_configuration_by_index(req, resp);
ASSERT_EQ(dsn::ERR_BUSY_CREATING, resp.err);
// client unknown state
app->status = dsn::app_status::AS_DROP_FAILED;
ss2->query_configuration_by_index(req, resp);
ASSERT_EQ(dsn::ERR_UNKNOWN, resp.err);
}
// simulate the half creating
std::cerr << "test some node for a app is not create on remote storage" << std::endl;
{
std::shared_ptr<server_state> ss2 = std::make_shared<server_state>();
dsn::error_code ec;
ss2->initialize(svc, apps_root);
dsn::dist::meta_state_service *storage = svc->get_remote_storage();
storage
->delete_node(ss2->get_partition_path(dsn::gpid{apps_count, 0}),
false,
LPC_META_CALLBACK,
[&ec](dsn::error_code error) { ec = error; },
nullptr)
->wait();
ASSERT_EQ(ec, dsn::ERR_OK);
ec = ss2->sync_apps_from_remote_storage();
ASSERT_EQ(ec, dsn::ERR_OK);
ASSERT_TRUE(ss2->spin_wait_staging(30));
}
}
static dsn::app_info create_app_info(dsn::app_status::type status,
std::string app_name,
int32_t id,
int32_t partition_count)
{
dsn::app_info info;
info.status = status;
info.app_type = "pegasus";
info.app_name = app_name;
info.app_id = id;
info.partition_count = partition_count;
info.is_stateful = true;
info.max_replica_count = 3;
info.expire_second = 0;
return info;
}
void meta_service_test_app::construct_apps_test()
{
std::vector<dsn::app_info> apps = {
create_app_info(dsn::app_status::AS_AVAILABLE, "test__4", 2, 10),
create_app_info(dsn::app_status::AS_AVAILABLE, "test", 4, 20),
create_app_info(dsn::app_status::AS_AVAILABLE, "test", 6, 30)};
query_app_info_response resp;
resp.apps = apps;
resp.err = dsn::ERR_OK;
std::shared_ptr<meta_service> svc(new meta_service());
std::vector<dsn::rpc_address> nodes;
std::string hint_message;
generate_node_list(nodes, 1, 1);
svc->_state->construct_apps({resp}, nodes, hint_message);
meta_view mv = svc->_state->get_meta_view();
const app_mapper &mapper = *(mv.apps);
ASSERT_EQ(6, mv.apps->size());
std::vector<dsn::app_info> result_apps = {
create_app_info(dsn::app_status::AS_DROPPING, "__drop_holder__1", 1, 1),
create_app_info(dsn::app_status::AS_CREATING, "test__4__2", 2, 10),
create_app_info(dsn::app_status::AS_DROPPING, "__drop_holder__3", 3, 1),
create_app_info(dsn::app_status::AS_CREATING, "test__4", 4, 20),
create_app_info(dsn::app_status::AS_DROPPING, "__drop_holder__5", 5, 1),
create_app_info(dsn::app_status::AS_CREATING, "test", 6, 30)};
int i = 0;
for (const auto &kv_pair : mapper) {
ASSERT_EQ(kv_pair.second->app_id, result_apps[i].app_id);
ASSERT_EQ(kv_pair.second->app_name, result_apps[i].app_name);
ASSERT_EQ(kv_pair.second->app_type, result_apps[i].app_type);
ASSERT_EQ(kv_pair.second->partition_count, result_apps[i].partition_count);
ASSERT_EQ(kv_pair.second->max_replica_count, result_apps[i].max_replica_count);
ASSERT_EQ(kv_pair.second->is_stateful, result_apps[i].is_stateful);
ASSERT_EQ(kv_pair.second->status, result_apps[i].status);
i++;
}
}
} // namespace replication
} // namespace dsn