blob: 24ac53192fc048020d7a16f140147b389ca74438 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// IWYU pragma: no_include <ext/alloc_traits.h>
#include <fmt/core.h>
#include <fmt/format.h>
#include <rocksdb/db.h>
#include <rocksdb/env.h>
#include <rocksdb/iterator.h>
#include <rocksdb/metadata.h>
#include <rocksdb/options.h>
#include <rocksdb/slice.h>
#include <rocksdb/sst_file_reader.h>
#include <rocksdb/sst_file_writer.h>
#include <rocksdb/table_properties.h>
#include <rocksdb/threadpool.h>
#include <stdio.h>
#include <algorithm>
#include <cmath>
#include <cstdint>
#include <iostream>
#include <map>
#include <memory>
#include <set>
#include <string>
#include <utility>
#include <vector>
#include "base/meta_store.h"
#include "base/pegasus_key_schema.h"
#include "base/value_schema_manager.h"
#include "client/partition_resolver.h"
#include "client/replication_ddl_client.h"
#include "common/gpid.h"
#include "common/replication_common.h"
#include "dsn.layer2_types.h"
#include "pegasus_value_schema.h"
#include "replica/replica_stub.h"
#include "replica/replication_app_base.h"
#include "shell/argh.h"
#include "shell/command_executor.h"
#include "shell/command_helper.h"
#include "shell/commands.h"
#include "utils/blob.h"
#include "utils/errors.h"
#include "utils/filesystem.h"
#include "utils/fmt_logging.h"
#include "utils/load_dump_object.h"
#include "utils/output_utils.h"
const std::string local_partition_split_help =
"<src_data_dirs> <dst_data_dirs> <src_app_id> "
"<dst_app_id> <src_partition_ids> <src_partition_count> "
"<dst_partition_count> <dst_app_name> [--post_full_compact] [--post_count] "
"[--threads_per_data_dir num] [--threads_per_partition num]";
struct ToSplitPatition
{
std::string replica_dir;
dsn::app_info ai;
dsn::replication::replica_init_info rii;
int32_t pidx = 0;
};
struct LocalPartitionSplitContext
{
// Parameters from the command line.
std::vector<std::string> src_data_dirs;
std::vector<std::string> dst_data_dirs;
uint32_t src_app_id = 0;
uint32_t dst_app_id = 0;
std::set<uint32_t> src_partition_ids;
uint32_t src_partition_count = 0;
uint32_t dst_partition_count = 0;
uint32_t threads_per_data_dir = 1;
uint32_t threads_per_partition = 1;
std::string dst_app_name;
bool post_full_compact = false;
bool post_count = false;
// Calculate from the parameters above.
uint32_t split_count = 0;
};
struct FileSplitResult
{
std::string filename;
bool success = false;
std::vector<uint64_t> split_counts;
};
struct PartitionSplitResult
{
std::string src_replica_dir;
std::map<std::string, int64_t> key_count_by_dst_replica_dirs;
bool success = false;
std::vector<FileSplitResult> fsrs;
};
struct DataDirSplitResult
{
std::string src_data_dir;
std::string dst_data_dir;
bool success = false;
std::vector<PartitionSplitResult> psrs;
};
bool validate_parameters(LocalPartitionSplitContext &lpsc)
{
// TODO(yingchun): check disk space.
// Check <src_data_dirs> and <dst_data_dirs>.
RETURN_FALSE_IF_NOT(
lpsc.src_data_dirs.size() == lpsc.dst_data_dirs.size(),
"invalid command, the list size of <src_data_dirs> and <dst_data_dirs> must be equal");
// Check <dst_app_id>.
RETURN_FALSE_IF_NOT(
lpsc.src_app_id != lpsc.dst_app_id,
"invalid command, <src_app_id> and <dst_app_id> should not be equal ({} vs. {})",
lpsc.src_app_id,
lpsc.dst_app_id);
// Check <src_partition_ids>.
for (const auto src_partition_id : lpsc.src_partition_ids) {
RETURN_FALSE_IF_NOT(
src_partition_id < lpsc.src_partition_count,
"invalid command, partition ids in <src_partition_ids> should be in range [0, {})",
lpsc.src_partition_count);
}
// Check <dst_partition_count>.
RETURN_FALSE_IF_NOT(lpsc.dst_partition_count > lpsc.src_partition_count,
"invalid command, <dst_partition_count> should be larger than "
"<src_partition_count> ({} vs. {})",
lpsc.dst_partition_count,
lpsc.src_partition_count);
lpsc.split_count = lpsc.dst_partition_count / lpsc.src_partition_count;
const auto log2n = static_cast<uint32_t>(log2(lpsc.split_count));
RETURN_FALSE_IF_NOT(pow(2, log2n) == lpsc.split_count,
"invalid command, <dst_partition_count> should be 2^n times of "
"<src_partition_count> ({} vs. {})",
lpsc.dst_partition_count,
lpsc.src_partition_count);
const auto es = replication_ddl_client::validate_app_name(lpsc.dst_app_name);
RETURN_FALSE_IF_NOT(es.is_ok(),
"invalid command, <dst_app_name> '{}' is invalid: {}",
lpsc.dst_app_name,
es.description());
return true;
}
std::string construct_split_directory(const std::string &parent_dir,
const ToSplitPatition &tsp,
uint32_t dst_app_id,
uint32_t split_index)
{
return fmt::format("{}/{}.{}.pegasus",
parent_dir,
dst_app_id,
tsp.pidx + split_index * tsp.ai.partition_count);
}
bool split_file(const LocalPartitionSplitContext &lpsc,
const ToSplitPatition &tsp,
const rocksdb::LiveFileMetaData &file,
const std::string &tmp_split_replicas_dir,
uint32_t pegasus_data_version,
FileSplitResult &sfr)
{
const auto src_sst_file = dsn::utils::filesystem::path_combine(file.db_path, file.name);
// 1. Open reader.
// TODO(yingchun): improve options.
auto reader = std::make_unique<rocksdb::SstFileReader>(rocksdb::Options());
RETURN_FALSE_IF_NON_RDB_OK(
reader->Open(src_sst_file), "open reader file '{}' failed", src_sst_file);
RETURN_FALSE_IF_NON_RDB_OK(
reader->VerifyChecksum(), "verify reader file '{}' failed", src_sst_file);
// 2. Validate the files.
const auto tbl_ppts = reader->GetTableProperties();
// The metadata column family file has been skipped in the previous steps.
CHECK_NE(tbl_ppts->column_family_name, pegasus::server::meta_store::META_COLUMN_FAMILY_NAME);
// TODO(yingchun): It seems the SstFileReader could only read the live key-value
// pairs in the sst file. If a key-value pair is put in a higher level and deleted
// in a lower level, it can still be read when iterate the high level sst file,
// which means the deleted key-value pair will appear again.
// So it's needed to do a full compaction before using the 'local_partition_split'
// tool to remove this kind of keys from DB.
// We use the following validators to check the sst file.
RETURN_FALSE_IF_NOT(tbl_ppts->num_deletions == 0,
"invalid sst file '{}', it contains {} deletions",
src_sst_file,
tbl_ppts->num_deletions);
RETURN_FALSE_IF_NOT(tbl_ppts->num_merge_operands == 0,
"invalid sst file '{}', it contains {} merge_operands",
src_sst_file,
tbl_ppts->num_merge_operands);
RETURN_FALSE_IF_NOT(tbl_ppts->num_range_deletions == 0,
"invalid sst file '{}', it contains {} range_deletions",
src_sst_file,
tbl_ppts->num_range_deletions);
// 3. Prepare the split temporary directories.
std::vector<std::string> dst_tmp_rdb_dirs;
dst_tmp_rdb_dirs.resize(lpsc.split_count);
for (int i = 0; i < lpsc.split_count; i++) {
const auto dst_tmp_rdb_dir =
construct_split_directory(tmp_split_replicas_dir, tsp, lpsc.dst_app_id, i);
RETURN_FALSE_IF_NOT(dsn::utils::filesystem::create_directory(dst_tmp_rdb_dir),
"create directory '{}' failed",
dst_tmp_rdb_dir);
dst_tmp_rdb_dirs[i] = dst_tmp_rdb_dir;
}
// 4. Iterate the sst file though sst reader, then split it to multiple sst files
// though sst writers.
std::vector<std::shared_ptr<rocksdb::SstFileWriter>> writers(lpsc.split_count);
std::unique_ptr<rocksdb::Iterator> iter(reader->NewIterator({}));
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
const auto &skey = iter->key();
const auto &svalue = iter->value();
// Skip empty write, see:
// https://pegasus.apache.org/zh/2018/03/07/last_flushed_decree.html.
if (skey.empty() && pegasus::value_schema_manager::instance()
.get_value_schema(pegasus_data_version)
->extract_user_data(svalue.ToString())
.empty()) {
continue;
}
// i. Calculate the hash value and corresponding new partition index.
dsn::blob bb_key(skey.data(), 0, skey.size());
uint64_t hash_value = pegasus::pegasus_key_hash(bb_key);
const auto new_pidx = dsn::replication::partition_resolver::get_partition_index(
static_cast<int>(lpsc.dst_partition_count), hash_value);
CHECK_LE(0, new_pidx);
CHECK_LT(new_pidx, lpsc.dst_partition_count);
// ii. Calculate the writer index.
const auto writer_idx = new_pidx / lpsc.src_partition_count;
CHECK_LE(0, writer_idx);
CHECK_LT(writer_idx, lpsc.split_count);
// TODO(yingchun): improve to check expired data.
// iii. Create the writer if needed.
auto &dst_writer = writers[writer_idx];
if (!dst_writer) {
const auto dst_tmp_rdb_file =
fmt::format("{}{}", dst_tmp_rdb_dirs[writer_idx], file.name);
// TODO(yingchun): improve options.
dst_writer =
std::make_shared<rocksdb::SstFileWriter>(rocksdb::EnvOptions(), rocksdb::Options());
RETURN_FALSE_IF_NON_RDB_OK(dst_writer->Open(dst_tmp_rdb_file),
"open writer file '{}' failed",
dst_tmp_rdb_file);
}
// iv. Write data to the new partition sst file.
sfr.split_counts[writer_idx]++;
RETURN_FALSE_IF_NON_RDB_OK(dst_writer->Put(skey, svalue),
"write data failed when split from file {}",
src_sst_file);
}
// 5. Finalize the writers.
for (int i = 0; i < lpsc.split_count; i++) {
// Skip the non-opened writer.
if (sfr.split_counts[i] == 0) {
CHECK_TRUE(writers[i] == nullptr);
continue;
}
RETURN_FALSE_IF_NON_RDB_OK(writers[i]->Finish(nullptr),
"finalize writer split from file '{}' failed",
src_sst_file);
}
return true;
}
bool open_rocksdb(const rocksdb::DBOptions &db_opts,
const std::string &rdb_dir,
bool read_only,
const std::vector<rocksdb::ColumnFamilyDescriptor> &cf_dscs,
std::vector<rocksdb::ColumnFamilyHandle *> *cf_hdls,
rocksdb::DB **db)
{
CHECK_NOTNULL(cf_hdls, "");
CHECK_NOTNULL(db, "");
if (read_only) {
RETURN_FALSE_IF_NON_RDB_OK(
rocksdb::DB::OpenForReadOnly(db_opts, rdb_dir, cf_dscs, cf_hdls, db),
"open rocksdb in '{}' failed",
rdb_dir);
} else {
RETURN_FALSE_IF_NON_RDB_OK(rocksdb::DB::Open(db_opts, rdb_dir, cf_dscs, cf_hdls, db),
"open rocksdb in '{}' failed",
rdb_dir);
}
CHECK_EQ(2, cf_hdls->size());
CHECK_EQ(pegasus::server::meta_store::DATA_COLUMN_FAMILY_NAME, (*cf_hdls)[0]->GetName());
CHECK_EQ(pegasus::server::meta_store::META_COLUMN_FAMILY_NAME, (*cf_hdls)[1]->GetName());
return true;
}
void release_db(std::vector<rocksdb::ColumnFamilyHandle *> *cf_hdls, rocksdb::DB **db)
{
CHECK_NOTNULL(cf_hdls, "");
CHECK_NOTNULL(db, "");
for (auto cf_hdl : *cf_hdls) {
delete cf_hdl;
}
cf_hdls->clear();
delete *db;
*db = nullptr;
}
bool split_partition(const LocalPartitionSplitContext &lpsc,
const ToSplitPatition &tsp,
const std::string &dst_replicas_dir,
const std::string &tmp_split_replicas_dir,
PartitionSplitResult &psr)
{
static const std::string kRdbDirPostfix =
dsn::utils::filesystem::path_combine(dsn::replication::replication_app_base::kDataDir,
dsn::replication::replication_app_base::kRdbDir);
const auto rdb_dir = dsn::utils::filesystem::path_combine(tsp.replica_dir, kRdbDirPostfix);
fmt::print(stdout, " start to split '{}'\n", rdb_dir);
// 1. Open the original rocksdb in read-only mode.
rocksdb::DBOptions db_opts;
// The following options should be set in Pegasus 2.0 and lower versions.
// db_opts.pegasus_data = true;
// db_opts.pegasus_data_version = pegasus::PEGASUS_DATA_VERSION_MAX;
const std::vector<rocksdb::ColumnFamilyDescriptor> cf_dscs(
{{pegasus::server::meta_store::DATA_COLUMN_FAMILY_NAME, {}},
{pegasus::server::meta_store::META_COLUMN_FAMILY_NAME, {}}});
std::vector<rocksdb::ColumnFamilyHandle *> cf_hdls;
rocksdb::DB *db = nullptr;
RETURN_FALSE_IF_NOT(open_rocksdb(db_opts, rdb_dir, true, cf_dscs, &cf_hdls, &db), "");
// 2. Get metadata from rocksdb.
// - In Pegasus versions lower than 2.0, the metadata is only stored in the MANIFEST
// file.
// - In Pegasus 2.0, the metadata is stored both in the metadata column family and
// MANIFEST file.
// - Since Pegasus 2.1, the metadata is only stored in the metadata column family.
auto ms = std::make_unique<pegasus::server::meta_store>(rdb_dir.c_str(), db, cf_hdls[1]);
uint64_t last_committed_decree;
RETURN_FALSE_IF_NON_OK(ms->get_last_flushed_decree(&last_committed_decree),
"get_last_flushed_decree from '{}' failed",
rdb_dir);
uint32_t pegasus_data_version;
RETURN_FALSE_IF_NON_OK(
ms->get_data_version(&pegasus_data_version), "get_data_version from '{}' failed", rdb_dir);
uint64_t last_manual_compact_finish_time;
RETURN_FALSE_IF_NON_OK(
ms->get_last_manual_compact_finish_time(&last_manual_compact_finish_time),
"get_last_manual_compact_finish_time from '{}' failed",
rdb_dir);
// 3. Get all live sst files.
std::vector<rocksdb::LiveFileMetaData> files;
db->GetLiveFilesMetaData(&files);
// 4. Close rocksdb.
release_db(&cf_hdls, &db);
// 5. Split the sst files.
auto files_thread_pool = std::unique_ptr<rocksdb::ThreadPool>(
rocksdb::NewThreadPool(static_cast<int>(lpsc.threads_per_partition)));
psr.fsrs.reserve(files.size());
for (const auto &file : files) {
// Skip metadata column family files, we will write metadata manually later in
// the new DB.
if (file.column_family_name == pegasus::server::meta_store::META_COLUMN_FAMILY_NAME) {
fmt::print(
stdout, " skip [{}]: {}: {}\n", file.column_family_name, file.db_path, file.name);
continue;
}
// Statistic the file split result.
psr.fsrs.emplace_back();
auto &sfr = psr.fsrs.back();
sfr.filename = file.name;
sfr.split_counts.resize(lpsc.split_count);
files_thread_pool->SubmitJob([=, &sfr]() {
sfr.success =
split_file(lpsc, tsp, file, tmp_split_replicas_dir, pegasus_data_version, sfr);
});
}
files_thread_pool->WaitForJobsAndJoinAllThreads();
files_thread_pool.reset();
// 6. Create new rocksdb instances for the new partitions.
// TODO(yingchun): poolize the following operations if necessary.
for (int i = 0; i < lpsc.split_count; i++) {
// The new replica is placed in 'dst_replicas_dir'.
const auto new_replica_dir =
construct_split_directory(dst_replicas_dir, tsp, lpsc.dst_app_id, i);
const auto new_rdb_dir =
dsn::utils::filesystem::path_combine(new_replica_dir, kRdbDirPostfix);
// i. Create the directory for the split rocksdb.
// TODO(yingchun): make sure it's not exist!
RETURN_FALSE_IF_NOT(dsn::utils::filesystem::create_directory(new_rdb_dir),
"create directory '{}' failed",
new_rdb_dir);
// ii. Open new rocksdb.
rocksdb::DBOptions new_db_opts;
new_db_opts.create_if_missing = true;
// Create the 'pegasus_meta_cf' column family.
new_db_opts.create_missing_column_families = true;
RETURN_FALSE_IF_NOT(open_rocksdb(new_db_opts, new_rdb_dir, false, cf_dscs, &cf_hdls, &db),
"");
const auto count_of_new_replica =
psr.key_count_by_dst_replica_dirs.insert({new_replica_dir, -1});
CHECK_TRUE(count_of_new_replica.second);
// iii. Ingest the split sst files to the new rocksdb.
do {
// Skip non-exist directory.
const auto dst_tmp_rdb_dir =
construct_split_directory(tmp_split_replicas_dir, tsp, lpsc.dst_app_id, i);
if (!dsn::utils::filesystem::directory_exists(dst_tmp_rdb_dir)) {
break;
}
// Gather all files.
rocksdb::IngestExternalFileArg arg;
arg.column_family = cf_hdls[0];
RETURN_FALSE_IF_NOT(
dsn::utils::filesystem::get_subfiles(dst_tmp_rdb_dir, arg.external_files, false),
"get sub-files from '{}' failed",
dst_tmp_rdb_dir);
// Skip empty directory.
if (arg.external_files.empty()) {
break;
}
// Ingest files.
RETURN_FALSE_IF_NON_RDB_OK(db->IngestExternalFiles({arg}),
"ingest files from '{}' to '{}' failed",
dst_tmp_rdb_dir,
new_rdb_dir);
// Optional full compaction.
if (lpsc.post_full_compact) {
RETURN_FALSE_IF_NON_RDB_OK(
db->CompactRange(rocksdb::CompactRangeOptions(), nullptr, nullptr),
"full compact rocksdb in '{}' failed",
new_rdb_dir);
}
// Optional data counting.
if (lpsc.post_count) {
std::unique_ptr<rocksdb::Iterator> iter(db->NewIterator({}));
int new_total_count = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
new_total_count++;
}
count_of_new_replica.first->second = new_total_count;
}
} while (false);
// iv. Set metadata to rocksdb.
// - In Pegasus versions lower than 2.0, the metadata is only stored in the MANIFEST
// file.
// - In Pegasus 2.0, the metadata is stored both in the metadata column family and
// MANIFEST file.
// - Since Pegasus 2.1, the metadata is only stored in the metadata column family.
// TODO(yingchun): these metadata are only written to the metadata column family,
// not the manifest file. So this tool is not supporting Pegasus versions lower
// than 2.0.
// For Pegasus 2.0, it's needed to set [pegasus.server]get_meta_store_type =
// "metacf" when restart replica servers after using this tool.
auto new_ms =
std::make_unique<pegasus::server::meta_store>(new_rdb_dir.c_str(), db, cf_hdls[1]);
new_ms->set_data_version(pegasus_data_version);
new_ms->set_last_flushed_decree(last_committed_decree);
new_ms->set_last_manual_compact_finish_time(last_manual_compact_finish_time);
rocksdb::FlushOptions options;
options.wait = true;
RETURN_FALSE_IF_NON_RDB_OK(
db->Flush(options, cf_hdls), "flush rocksdb in '{}' failed", new_rdb_dir);
// v. Close rocksdb.
release_db(&cf_hdls, &db);
// vi. Generate new ".app-info".
dsn::app_info new_ai(tsp.ai);
new_ai.app_name = lpsc.dst_app_name;
new_ai.app_id = static_cast<int32_t>(lpsc.dst_app_id);
new_ai.partition_count = static_cast<int32_t>(lpsc.dst_partition_count);
// Note that the online partition split used 'init_partition_count' field will be
// reset.
new_ai.init_partition_count = -1;
dsn::replication::replica_app_info rai(&new_ai);
const auto rai_path = dsn::utils::filesystem::path_combine(
new_replica_dir, dsn::replication::replica_app_info::kAppInfo);
RETURN_FALSE_IF_NON_OK(rai.store(rai_path), "write replica_app_info '{}' failed", rai_path);
// vii. Generate new ".init-info".
dsn::replication::replica_init_info new_rii(tsp.rii);
new_rii.init_offset_in_shared_log = 0;
new_rii.init_offset_in_private_log = 0;
const auto rii_path =
dsn::utils::filesystem::path_combine(new_replica_dir, replica_init_info::kInitInfo);
RETURN_FALSE_IF_NON_OK(dsn::utils::dump_rjobj_to_file(new_rii, rii_path),
"write replica_init_info '{}' failed",
rii_path);
}
if (std::any_of(psr.fsrs.begin(), psr.fsrs.end(), [](const FileSplitResult &fsr) {
return !fsr.success;
})) {
return false;
}
return true;
}
bool split_data_directory(const LocalPartitionSplitContext &lpsc,
const std::string &src_data_dir,
const std::string &dst_data_dir,
DataDirSplitResult &ddsr)
{
static const std::string kReplicasDir =
dsn::utils::filesystem::path_combine(dsn::replication::replication_options::kReplicaAppType,
dsn::replication::replication_options::kRepsDir);
// 1. Collect all replica directories from 'src_data_dir'.
const auto src_replicas_dir = dsn::utils::filesystem::path_combine(src_data_dir, kReplicasDir);
std::vector<std::string> replica_dirs;
RETURN_FALSE_IF_NOT(
dsn::utils::filesystem::get_subdirectories(src_replicas_dir, replica_dirs, false),
"get sub-directories from '{}' failed",
src_replicas_dir);
// 2. Create temporary split directory on 'dst_data_dir'.
const auto tmp_split_replicas_dir = dsn::utils::filesystem::path_combine(dst_data_dir, "split");
RETURN_FALSE_IF_NOT(dsn::utils::filesystem::create_directory(tmp_split_replicas_dir),
"create split directory '{}' failed",
tmp_split_replicas_dir);
// 3. Gather partitions to split.
std::vector<ToSplitPatition> to_split_partitions;
std::set<uint32_t> exist_app_ids;
std::set<std::string> exist_app_names;
std::set<uint32_t> remain_partition_ids(lpsc.src_partition_ids);
const std::set<std::string> ordered_replica_dirs(replica_dirs.begin(), replica_dirs.end());
for (const auto &replica_dir : ordered_replica_dirs) {
// i. Validate the replica directory.
dsn::app_info ai;
dsn::gpid pid;
std::string hint_message;
if (!replica_stub::validate_replica_dir(replica_dir, ai, pid, hint_message)) {
fmt::print(stderr, "invalid replica dir '{}': {}\n", replica_dir, hint_message);
continue;
}
// ii. Skip the non-<src_app_id>.
CHECK_EQ(pid.get_app_id(), ai.app_id);
if (ai.app_id != lpsc.src_app_id) {
continue;
}
// iii. Skip and warning for the replica with the same app id but not desired partition
// index.
const auto cur_pidx = pid.get_partition_index();
if (lpsc.src_partition_ids.count(cur_pidx) == 0) {
fmt::print(stdout,
"WARNING: the partition index {} of the <src_app_id> {} is skipped\n",
cur_pidx,
lpsc.src_app_id);
continue;
}
// iv. Continue and warning if the <dst_app_id> exist.
exist_app_ids.insert(ai.app_id);
if (exist_app_ids.count(lpsc.dst_app_id) != 0) {
fmt::print(
stdout,
"WARNING: there is already a replica {} with the same <dst_app_id> {} exists\n",
replica_dir,
lpsc.dst_app_id);
}
// v. Continue and warning if the <dst_app_name> exist.
exist_app_names.insert(ai.app_name);
if (exist_app_names.count(lpsc.dst_app_name) != 0) {
fmt::print(
stdout,
"WARNING: there is already a replica {} with the same <dst_app_name> {} exists\n",
replica_dir,
lpsc.dst_app_name);
}
// vi. Check if <src_partition_count> matches.
RETURN_FALSE_IF_NOT(ai.partition_count == lpsc.src_partition_count,
"unmatched <src_partition_count> ({} vs {})",
ai.partition_count,
lpsc.src_partition_count);
// vii. Check the app status.
RETURN_FALSE_IF_NOT(ai.status == dsn::app_status::AS_AVAILABLE,
"not support to split app '{}' in non-AVAILABLE status",
ai.app_name);
// viii. Check if the app is duplicating or bulk loading.
RETURN_FALSE_IF_NOT(!ai.duplicating && !ai.is_bulk_loading,
"not support to split app '{}' which is duplicating or bulk loading",
ai.app_name);
// ix. Load the replica_init_info.
dsn::replication::replica_init_info rii;
const auto rii_path =
dsn::utils::filesystem::path_combine(replica_dir, replica_init_info::kInitInfo);
RETURN_FALSE_IF_NON_OK(dsn::utils::load_rjobj_from_file(rii_path, &rii),
"load replica_init_info from '{}' failed",
rii_path);
// x. Gather the replica.
to_split_partitions.push_back({replica_dir, ai, rii, pid.get_partition_index()});
remain_partition_ids.erase(cur_pidx);
}
if (!remain_partition_ids.empty()) {
fmt::print(stdout,
"WARNING: the partitions {} are skipped to be split\n",
fmt::join(remain_partition_ids, ","));
}
// 4. Split the partitions.
const auto dst_replicas_dir = dsn::utils::filesystem::path_combine(dst_data_dir, kReplicasDir);
auto partitions_thread_pool = std::unique_ptr<rocksdb::ThreadPool>(
rocksdb::NewThreadPool(static_cast<int>(lpsc.threads_per_data_dir)));
ddsr.psrs.reserve(to_split_partitions.size());
for (const auto &tsp : to_split_partitions) {
// Statistic the partition split result.
ddsr.psrs.emplace_back();
auto &psr = ddsr.psrs.back();
psr.src_replica_dir = tsp.replica_dir;
partitions_thread_pool->SubmitJob([=, &psr]() {
psr.success = split_partition(lpsc, tsp, dst_replicas_dir, tmp_split_replicas_dir, psr);
});
}
partitions_thread_pool->WaitForJobsAndJoinAllThreads();
if (std::any_of(ddsr.psrs.begin(), ddsr.psrs.end(), [](const PartitionSplitResult &psr) {
return !psr.success;
})) {
return false;
}
return true;
}
bool local_partition_split(command_executor *e, shell_context *sc, arguments args)
{
// 1. Parse parameters.
argh::parser cmd(args.argc, args.argv);
RETURN_FALSE_IF_NOT(cmd.pos_args().size() >= 8,
"invalid command, should be in the form of '{}'",
local_partition_split_help);
int param_index = 1;
LocalPartitionSplitContext lpsc;
PARSE_STRS(lpsc.src_data_dirs);
PARSE_STRS(lpsc.dst_data_dirs);
PARSE_UINT(lpsc.src_app_id);
PARSE_UINT(lpsc.dst_app_id);
PARSE_UINTS(lpsc.src_partition_ids);
PARSE_UINT(lpsc.src_partition_count);
PARSE_UINT(lpsc.dst_partition_count);
lpsc.dst_app_name = cmd(param_index++).str();
PARSE_OPT_UINT(lpsc.threads_per_data_dir, 1, "threads_per_data_dir");
PARSE_OPT_UINT(lpsc.threads_per_partition, 1, "threads_per_partition");
lpsc.post_full_compact = cmd["--post_full_compact"];
lpsc.post_count = cmd["--post_count"];
// 2. Check parameters.
if (!validate_parameters(lpsc)) {
return false;
}
// 3. Split each data directory.
auto data_dirs_thread_pool = std::unique_ptr<rocksdb::ThreadPool>(
rocksdb::NewThreadPool(static_cast<int>(lpsc.src_data_dirs.size())));
CHECK_EQ(lpsc.src_data_dirs.size(), lpsc.dst_data_dirs.size());
std::vector<DataDirSplitResult> ddsrs;
ddsrs.reserve(lpsc.src_data_dirs.size());
for (auto i = 0; i < lpsc.src_data_dirs.size(); i++) {
const auto &src_data_dir = lpsc.src_data_dirs[i];
const auto &dst_data_dir = lpsc.dst_data_dirs[i];
// Statistic the data directory split result.
ddsrs.emplace_back();
auto &ddsr = ddsrs.back();
ddsr.src_data_dir = src_data_dir;
ddsr.dst_data_dir = dst_data_dir;
data_dirs_thread_pool->SubmitJob([=, &ddsr]() {
ddsr.success = split_data_directory(lpsc, src_data_dir, dst_data_dir, ddsr);
});
}
data_dirs_thread_pool->WaitForJobsAndJoinAllThreads();
// 4. Output the result.
dsn::utils::table_printer tp("partition_split_result");
tp.add_title("src_replica");
tp.add_column("dst_replica");
tp.add_column("success");
tp.add_column("key_count");
for (const auto &ddsr : ddsrs) {
for (const auto &psr : ddsr.psrs) {
for (const auto &[new_dst_replica_dir, key_count] : psr.key_count_by_dst_replica_dirs) {
tp.add_row(psr.src_replica_dir);
tp.append_data(new_dst_replica_dir);
tp.append_data(psr.success);
tp.append_data(key_count);
}
}
}
tp.output(std::cout, tp_output_format::kTabular);
return true;
}