blob: c8a0d888ed008e48e20294ec43ff7eff437e7021 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <fmt/core.h>
#include <algorithm>
#include <cstdint>
#include <queue>
#include <type_traits>
#include "absl/strings/string_view.h"
#include "common//duplication_common.h"
#include "common/common.h"
#include "common/gpid.h"
#include "common/replication.codes.h"
#include "common/replication_enums.h"
#include "common/replication_other_types.h"
#include "dsn.layer2_types.h"
#include "duplication_types.h"
#include "meta/meta_service.h"
#include "meta/meta_state_service_utils.h"
#include "meta_admin_types.h"
#include "meta_duplication_service.h"
#include "metadata_types.h"
#include "runtime/api_layer1.h"
#include "runtime/rpc/dns_resolver.h"
#include "runtime/rpc/group_host_port.h"
#include "runtime/rpc/rpc_address.h"
#include "runtime/rpc/rpc_host_port.h"
#include "runtime/rpc/rpc_message.h"
#include "runtime/rpc/serialization.h"
#include "runtime/task/async_calls.h"
#include "utils/api_utilities.h"
#include "utils/blob.h"
#include "utils/chrono_literals.h"
#include "utils/error_code.h"
#include "utils/errors.h"
#include "utils/fail_point.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"
#include "utils/ports.h"
#include "utils/string_conv.h"
#include "utils/zlocks.h"
DSN_DECLARE_bool(dup_ignore_other_cluster_ids);
namespace dsn {
namespace replication {
using namespace literals::chrono_literals;
// ThreadPool(READ): THREAD_POOL_META_SERVER
void meta_duplication_service::query_duplication_info(const duplication_query_request &request,
duplication_query_response &response)
{
LOG_INFO("query duplication info for app: {}", request.app_name);
response.err = ERR_OK;
{
zauto_read_lock l(app_lock());
std::shared_ptr<app_state> app = _state->get_app(request.app_name);
if (!app || app->status != app_status::AS_AVAILABLE) {
response.err = ERR_APP_NOT_EXIST;
return;
}
response.appid = app->app_id;
for (const auto & [ _, dup ] : app->duplications) {
dup->append_if_valid_for_query(*app, response.entry_list);
}
}
}
// ThreadPool(WRITE): THREAD_POOL_META_STATE
void meta_duplication_service::modify_duplication(duplication_modify_rpc rpc)
{
const auto &request = rpc.request();
auto &response = rpc.response();
LOG_INFO("modify duplication({}) to [status={},fail_mode={}] for app({})",
request.dupid,
request.__isset.status ? duplication_status_to_string(request.status) : "nil",
request.__isset.fail_mode ? duplication_fail_mode_to_string(request.fail_mode) : "nil",
request.app_name);
dupid_t dupid = request.dupid;
std::shared_ptr<app_state> app = _state->get_app(request.app_name);
if (!app || app->status != app_status::AS_AVAILABLE) {
response.err = ERR_APP_NOT_EXIST;
return;
}
auto it = app->duplications.find(dupid);
if (it == app->duplications.end()) {
response.err = ERR_OBJECT_NOT_FOUND;
return;
}
duplication_info_s_ptr dup = it->second;
auto to_status = request.__isset.status ? request.status : dup->status();
auto to_fail_mode = request.__isset.fail_mode ? request.fail_mode : dup->fail_mode();
response.err = dup->alter_status(to_status, to_fail_mode);
if (response.err != ERR_OK) {
return;
}
if (!dup->is_altering()) {
return;
}
// validation passed
do_modify_duplication(app, dup, rpc);
}
// ThreadPool(WRITE): THREAD_POOL_META_STATE
void meta_duplication_service::do_modify_duplication(std::shared_ptr<app_state> &app,
duplication_info_s_ptr &dup,
duplication_modify_rpc &rpc)
{
if (rpc.request().status == duplication_status::DS_REMOVED) {
_meta_svc->get_meta_storage()->delete_node_recursively(
std::string(dup->store_path), [rpc, this, app, dup]() {
dup->persist_status();
rpc.response().err = ERR_OK;
rpc.response().appid = app->app_id;
if (rpc.request().status == duplication_status::DS_REMOVED) {
zauto_write_lock l(app_lock());
app->duplications.erase(dup->id);
refresh_duplicating_no_lock(app);
}
});
return;
}
// store the duplication in requested status.
blob value = dup->to_json_blob();
_meta_svc->get_meta_storage()->set_data(
std::string(dup->store_path), std::move(value), [rpc, app, dup]() {
dup->persist_status();
rpc.response().err = ERR_OK;
rpc.response().appid = app->app_id;
});
}
#define LOG_DUP_HINT_AND_RETURN(resp, level, ...) \
do { \
const std::string _msg(fmt::format(__VA_ARGS__)); \
(resp).__set_hint(_msg); \
LOG(level, _msg); \
return; \
} while (0)
#define LOG_DUP_HINT_AND_RETURN_IF_NOT(expr, resp, ec, level, ...) \
do { \
if (dsn_likely(expr)) { \
break; \
} \
\
(resp).err = (ec); \
LOG_DUP_HINT_AND_RETURN(resp, level, __VA_ARGS__); \
} while (0)
#define LOG_WARNING_DUP_HINT_AND_RETURN_IF_NOT(expr, resp, ec, ...) \
LOG_DUP_HINT_AND_RETURN_IF_NOT(expr, resp, ec, LOG_LEVEL_WARNING, __VA_ARGS__)
#define LOG_ERROR_DUP_HINT_AND_RETURN_IF_NOT(expr, resp, ec, ...) \
LOG_DUP_HINT_AND_RETURN_IF_NOT(expr, resp, ec, LOG_LEVEL_ERROR, __VA_ARGS__)
// This call will not recreate if the duplication
// with the same app name and remote end point already exists.
// ThreadPool(WRITE): THREAD_POOL_META_STATE
void meta_duplication_service::add_duplication(duplication_add_rpc rpc)
{
const auto &request = rpc.request();
auto &response = rpc.response();
std::string remote_app_name;
if (request.__isset.remote_app_name) {
remote_app_name = request.remote_app_name;
} else {
// Once remote_app_name is not specified by client, use source app_name as
// remote_app_name to be compatible with old versions(< v2.6.0) of client.
remote_app_name = request.app_name;
}
int32_t remote_replica_count =
request.__isset.remote_replica_count ? request.remote_replica_count : 0;
LOG_INFO("add duplication for app({}), remote cluster name is {}, "
"remote app name is {}, remote replica count is {}",
request.app_name,
request.remote_cluster_name,
remote_app_name,
remote_replica_count);
LOG_WARNING_DUP_HINT_AND_RETURN_IF_NOT(request.remote_cluster_name !=
get_current_cluster_name(),
response,
ERR_INVALID_PARAMETERS,
"illegal operation: adding duplication to itself");
if (!FLAGS_dup_ignore_other_cluster_ids) {
auto remote_cluster_id = get_duplication_cluster_id(request.remote_cluster_name);
LOG_WARNING_DUP_HINT_AND_RETURN_IF_NOT(remote_cluster_id.is_ok(),
response,
ERR_INVALID_PARAMETERS,
"get_duplication_cluster_id({}) failed, error: {}",
request.remote_cluster_name,
remote_cluster_id.get_error());
}
std::vector<host_port> meta_list;
LOG_WARNING_DUP_HINT_AND_RETURN_IF_NOT(
dsn::replication::replica_helper::load_servers_from_config(
duplication_constants::kClustersSectionName, request.remote_cluster_name, meta_list),
response,
ERR_INVALID_PARAMETERS,
"failed to find cluster[{}] address in config [{}]",
request.remote_cluster_name,
duplication_constants::kClustersSectionName);
LOG_WARNING_DUP_HINT_AND_RETURN_IF_NOT(
remote_replica_count >= 0,
response,
ERR_INVALID_PARAMETERS,
"invalid remote_replica_count({}) which should never be negative",
remote_replica_count);
std::shared_ptr<app_state> app;
duplication_info_s_ptr dup;
{
zauto_read_lock l(app_lock());
app = _state->get_app(request.app_name);
// The reason why using !!app rather than just app is that passing std::shared_ptr into
// dsn_likely(i.e. __builtin_expect) would lead to compilation error "cannot convert
// 'std::shared_ptr<dsn::replication::app_state>' to 'long int'".
LOG_WARNING_DUP_HINT_AND_RETURN_IF_NOT(
!!app, response, ERR_APP_NOT_EXIST, "app {} was not found", request.app_name);
LOG_WARNING_DUP_HINT_AND_RETURN_IF_NOT(app->status == app_status::AS_AVAILABLE,
response,
ERR_APP_NOT_EXIST,
"app status was not AS_AVAILABLE: name={}, "
"status={}",
request.app_name,
enum_to_string(app->status));
for (const auto & [ _, dup_info ] : app->duplications) {
if (dup_info->remote_cluster_name == request.remote_cluster_name) {
dup = dup_info;
break;
}
}
if (remote_replica_count == 0) {
// 0 means that the replica count of the remote app would be the same as the
// source app.
remote_replica_count = app->max_replica_count;
}
if (dup) {
// The duplication for the same app to the same remote cluster has existed.
remote_app_name = dup->remote_app_name;
remote_replica_count = dup->remote_replica_count;
LOG_INFO("no need to add duplication, since it has existed: app_name={}, "
"remote_cluster_name={}, remote_app_name={}",
request.app_name,
request.remote_cluster_name,
remote_app_name);
} else {
// Check if other apps of this cluster are duplicated to the same remote app.
for (const auto & [ app_name, cur_app_state ] : _state->_exist_apps) {
if (app_name == request.app_name) {
// Skip this app since we want to check other apps.
continue;
}
for (const auto & [ _, dup_info ] : cur_app_state->duplications) {
LOG_WARNING_DUP_HINT_AND_RETURN_IF_NOT(
dup_info->remote_cluster_name != request.remote_cluster_name ||
dup_info->remote_app_name != remote_app_name,
response,
ERR_INVALID_PARAMETERS,
"illegal operation: another app({}) is also "
"duplicated to the same remote app("
"cluster={}, app={})",
app_name,
request.remote_cluster_name,
remote_app_name);
}
}
}
}
if (!dup) {
dup = new_dup_from_init(request.remote_cluster_name,
remote_app_name,
remote_replica_count,
std::move(meta_list),
app);
}
do_add_duplication(app, dup, rpc, remote_app_name, remote_replica_count);
}
// ThreadPool(WRITE): THREAD_POOL_META_STATE
void meta_duplication_service::do_add_duplication(std::shared_ptr<app_state> &app,
duplication_info_s_ptr &dup,
duplication_add_rpc &rpc,
const std::string &remote_app_name,
const int32_t remote_replica_count)
{
const auto &ec = dup->start(rpc.request().is_duplicating_checkpoint);
LOG_ERROR_DUP_HINT_AND_RETURN_IF_NOT(ec == ERR_OK,
rpc.response(),
ec,
"start dup[{}({})] failed: err = {}",
app->app_name,
dup->id,
ec);
auto value = dup->to_json_blob();
std::queue<std::string> nodes({get_duplication_path(*app), std::to_string(dup->id)});
_meta_svc->get_meta_storage()->create_node_recursively(
std::move(nodes),
std::move(value),
[app, this, dup, rpc, remote_app_name, remote_replica_count]() mutable {
LOG_INFO("[{}] add duplication successfully [app_name: {}, follower: {}]",
dup->log_prefix(),
app->app_name,
dup->remote_cluster_name);
// The duplication starts only after it's been persisted.
dup->persist_status();
auto &resp = rpc.response();
resp.err = ERR_OK;
resp.appid = app->app_id;
resp.dupid = dup->id;
resp.__set_remote_app_name(remote_app_name);
resp.__set_remote_replica_count(remote_replica_count);
zauto_write_lock l(app_lock());
refresh_duplicating_no_lock(app);
});
}
/// get all available apps on node `ns`
void meta_duplication_service::get_all_available_app(
const node_state &ns, std::map<int32_t, std::shared_ptr<app_state>> &app_map) const
{
ns.for_each_partition([this, &ns, &app_map](const gpid &pid) -> bool {
if (ns.served_as(pid) != partition_status::PS_PRIMARY) {
return true;
}
std::shared_ptr<app_state> app = _state->get_app(pid.get_app_id());
if (!app || app->status != app_status::AS_AVAILABLE) {
return true;
}
// must have duplication
if (app->duplications.empty()) {
return true;
}
if (app_map.find(app->app_id) == app_map.end()) {
app_map.emplace(std::make_pair(pid.get_app_id(), std::move(app)));
}
return true;
});
}
// ThreadPool(WRITE): THREAD_POOL_META_STATE
void meta_duplication_service::duplication_sync(duplication_sync_rpc rpc)
{
auto &request = rpc.request();
auto &response = rpc.response();
response.err = ERR_OK;
host_port src_hp;
GET_HOST_PORT(request, node, src_hp);
const auto *ns = get_node_state(_state->_nodes, src_hp, false);
if (ns == nullptr) {
LOG_WARNING("node({}) is not found in meta server", FMT_HOST_PORT_AND_IP(request, node));
response.err = ERR_OBJECT_NOT_FOUND;
return;
}
std::map<int32_t, std::shared_ptr<app_state>> app_map;
get_all_available_app(*ns, app_map);
for (const auto &kv : app_map) {
int32_t app_id = kv.first;
const auto &app = kv.second;
for (const auto &kv2 : app->duplications) {
dupid_t dup_id = kv2.first;
const auto &dup = kv2.second;
if (dup->is_invalid_status()) {
continue;
}
if (dup->status() < duplication_status::DS_LOG && dup->all_checkpoint_has_prepared()) {
if (dup->status() == duplication_status::DS_PREPARE) {
create_follower_app_for_duplication(dup, app);
} else if (dup->status() == duplication_status::DS_APP) {
check_follower_app_if_create_completed(dup);
}
}
response.dup_map[app_id][dup_id] = dup->to_duplication_entry();
// report progress periodically for each duplications
dup->report_progress_if_time_up();
}
}
/// update progress
for (const auto &kv : request.confirm_list) {
gpid gpid = kv.first;
auto it = app_map.find(gpid.get_app_id());
if (it == app_map.end()) {
// app is unsynced
// Since duplication-sync separates with config-sync, it's not guaranteed to have the
// latest state. duplication-sync has a loose consistency requirement.
continue;
}
std::shared_ptr<app_state> &app = it->second;
for (const duplication_confirm_entry &confirm : kv.second) {
auto it2 = app->duplications.find(confirm.dupid);
if (it2 == app->duplications.end()) {
// dup is unsynced
continue;
}
duplication_info_s_ptr &dup = it2->second;
if (dup->is_invalid_status()) {
continue;
}
do_update_partition_confirmed(dup, rpc, gpid.get_partition_index(), confirm);
}
}
}
void meta_duplication_service::create_follower_app_for_duplication(
const std::shared_ptr<duplication_info> &dup, const std::shared_ptr<app_state> &app)
{
configuration_create_app_request request;
request.app_name = dup->remote_app_name;
request.options.app_type = app->app_type;
request.options.partition_count = app->partition_count;
request.options.replica_count = dup->remote_replica_count;
request.options.success_if_exist = false;
request.options.envs = app->envs;
request.options.is_stateful = app->is_stateful;
// add envs for follower table, which will use it know itself is `follower` and load master info
// - env map:
// `kDuplicationEnvMasterClusterKey=>{master_cluster_name}`
// `kDuplicationEnvMasterMetasKey=>{master_meta_list}`
request.options.envs.emplace(duplication_constants::kDuplicationEnvMasterClusterKey,
get_current_cluster_name());
request.options.envs.emplace(duplication_constants::kDuplicationEnvMasterMetasKey,
_meta_svc->get_meta_list_string());
request.options.envs.emplace(duplication_constants::kDuplicationEnvMasterAppNameKey,
app->app_name);
host_port meta_servers;
meta_servers.assign_group(dup->remote_cluster_name.c_str());
meta_servers.group_host_port()->add_list(dup->remote_cluster_metas);
dsn::message_ex *msg = dsn::message_ex::create_request(RPC_CM_CREATE_APP);
dsn::marshall(msg, request);
rpc::call(
dsn::dns_resolver::instance().resolve_address(meta_servers),
msg,
_meta_svc->tracker(),
[=](error_code err, configuration_create_app_response &&resp) mutable {
FAIL_POINT_INJECT_NOT_RETURN_F("update_app_request_ok",
[&](absl::string_view s) -> void { err = ERR_OK; });
error_code create_err = err == ERR_OK ? resp.err : err;
error_code update_err = ERR_NO_NEED_OPERATE;
FAIL_POINT_INJECT_NOT_RETURN_F(
"persist_dup_status_failed",
[&](absl::string_view s) -> void { create_err = ERR_OK; });
if (create_err == ERR_OK) {
update_err = dup->alter_status(duplication_status::DS_APP);
}
FAIL_POINT_INJECT_F("persist_dup_status_failed",
[&](absl::string_view s) -> void { return; });
if (update_err == ERR_OK) {
blob value = dup->to_json_blob();
// Note: this function is `async`, it may not be persisted completed
// after executing, now using `_is_altering` to judge whether `updating` or
// `completed`, if `_is_altering`, dup->alter_status() will return `ERR_BUSY`
_meta_svc->get_meta_storage()->set_data(std::string(dup->store_path),
std::move(value),
[=]() { dup->persist_status(); });
} else {
LOG_ERROR("created follower app[{}.{}] to trigger duplicate checkpoint failed: "
"duplication_status = {}, create_err = {}, update_err = {}",
dup->remote_cluster_name,
dup->app_name,
duplication_status_to_string(dup->status()),
create_err,
update_err);
}
});
}
void meta_duplication_service::check_follower_app_if_create_completed(
const std::shared_ptr<duplication_info> &dup)
{
host_port meta_servers;
meta_servers.assign_group(dup->remote_cluster_name.c_str());
meta_servers.group_host_port()->add_list(dup->remote_cluster_metas);
query_cfg_request meta_config_request;
meta_config_request.app_name = dup->app_name;
dsn::message_ex *msg = dsn::message_ex::create_request(RPC_CM_QUERY_PARTITION_CONFIG_BY_INDEX);
dsn::marshall(msg, meta_config_request);
rpc::call(dsn::dns_resolver::instance().resolve_address(meta_servers),
msg,
_meta_svc->tracker(),
[=](error_code err, query_cfg_response &&resp) mutable {
FAIL_POINT_INJECT_NOT_RETURN_F("create_app_ok", [&](absl::string_view s) -> void {
err = ERR_OK;
int count = dup->partition_count;
while (count-- > 0) {
const host_port primary("localhost", 34801);
const host_port secondary1("localhost", 34802);
const host_port secondary2("localhost", 34803);
partition_configuration p;
SET_IP_AND_HOST_PORT_BY_DNS(p, primary, primary);
SET_IPS_AND_HOST_PORTS_BY_DNS(p, secondaries, secondary1, secondary2);
resp.partitions.emplace_back(p);
}
});
// - ERR_INCONSISTENT_STATE: partition count of response isn't equal with local
// - ERR_INACTIVE_STATE: the follower table hasn't been healthy
error_code query_err = err == ERR_OK ? resp.err : err;
if (query_err == ERR_OK) {
if (resp.partitions.size() != dup->partition_count) {
query_err = ERR_INCONSISTENT_STATE;
} else {
for (const auto &partition : resp.partitions) {
if (!partition.hp_primary) {
query_err = ERR_INACTIVE_STATE;
break;
}
if (partition.hp_secondaries.empty()) {
query_err = ERR_NOT_ENOUGH_MEMBER;
break;
}
for (const auto &secondary : partition.hp_secondaries) {
if (!secondary) {
query_err = ERR_INACTIVE_STATE;
break;
}
}
}
}
}
error_code update_err = ERR_NO_NEED_OPERATE;
if (query_err == ERR_OK) {
update_err = dup->alter_status(duplication_status::DS_LOG);
}
FAIL_POINT_INJECT_F("persist_dup_status_failed",
[&](absl::string_view s) -> void { return; });
if (update_err == ERR_OK) {
blob value = dup->to_json_blob();
// Note: this function is `async`, it may not be persisted completed
// after executing, now using `_is_altering` to judge whether `updating` or
// `completed`, if `_is_altering`, dup->alter_status() will return `ERR_BUSY`
_meta_svc->get_meta_storage()->set_data(std::string(dup->store_path),
std::move(value),
[dup]() { dup->persist_status(); });
} else {
LOG_ERROR(
"query follower app[{}.{}] replica configuration completed, result: "
"duplication_status = {}, query_err = {}, update_err = {}",
dup->remote_cluster_name,
dup->app_name,
duplication_status_to_string(dup->status()),
query_err,
update_err);
}
});
}
void meta_duplication_service::do_update_partition_confirmed(
duplication_info_s_ptr &dup,
duplication_sync_rpc &rpc,
int32_t partition_idx,
const duplication_confirm_entry &confirm_entry)
{
if (dup->alter_progress(partition_idx, confirm_entry)) {
std::string path = get_partition_path(dup, std::to_string(partition_idx));
blob value = blob::create_from_bytes(std::to_string(confirm_entry.confirmed_decree));
_meta_svc->get_meta_storage()->get_data(std::string(path), [=](const blob &data) mutable {
if (data.length() == 0) {
_meta_svc->get_meta_storage()->create_node(
std::string(path), std::move(value), [=]() mutable {
dup->persist_progress(partition_idx);
rpc.response().dup_map[dup->app_id][dup->id].progress[partition_idx] =
confirm_entry.confirmed_decree;
});
} else {
_meta_svc->get_meta_storage()->set_data(
std::string(path), std::move(value), [=]() mutable {
dup->persist_progress(partition_idx);
rpc.response().dup_map[dup->app_id][dup->id].progress[partition_idx] =
confirm_entry.confirmed_decree;
});
}
// duplication_sync_rpc will finally be replied when confirmed points
// of all partitions are stored.
});
}
}
std::shared_ptr<duplication_info>
meta_duplication_service::new_dup_from_init(const std::string &remote_cluster_name,
const std::string &remote_app_name,
const int32_t remote_replica_count,
std::vector<host_port> &&remote_cluster_metas,
std::shared_ptr<app_state> &app) const
{
duplication_info_s_ptr dup;
// Use current time to identify this duplication.
auto dupid = static_cast<dupid_t>(dsn_now_s());
{
zauto_write_lock l(app_lock());
// Hold write lock here to ensure that dupid is unique.
for (; app->duplications.find(dupid) != app->duplications.end(); ++dupid) {
}
std::string dup_path = get_duplication_path(*app, std::to_string(dupid));
dup = std::make_shared<duplication_info>(dupid,
app->app_id,
app->app_name,
app->partition_count,
remote_replica_count,
dsn_now_ms(),
remote_cluster_name,
remote_app_name,
std::move(remote_cluster_metas),
std::move(dup_path));
for (int32_t i = 0; i < app->partition_count; i++) {
dup->init_progress(i, invalid_decree);
}
app->duplications.emplace(dup->id, dup);
}
return dup;
}
// ThreadPool(WRITE): THREAD_POOL_META_STATE
void meta_duplication_service::recover_from_meta_state()
{
LOG_INFO("recovering duplication states from meta storage");
// /<app>/duplication/<dupid>/<partition_idx>
// | |-> confirmed_decree
// |
// |-> json of dup info
for (const auto &kv : _state->_exist_apps) {
std::shared_ptr<app_state> app = kv.second;
if (app->status != app_status::AS_AVAILABLE) {
continue;
}
_meta_svc->get_meta_storage()->get_children(
get_duplication_path(*app),
[this, app](bool node_exists, const std::vector<std::string> &dup_id_list) {
if (!node_exists) {
// if there's no duplication
return;
}
for (const std::string &raw_dup_id : dup_id_list) {
dupid_t dup_id;
if (!buf2int32(raw_dup_id, dup_id)) {
// unlikely
LOG_ERROR("invalid duplication path: {}",
get_duplication_path(*app, raw_dup_id));
return;
}
do_restore_duplication(dup_id, app);
}
});
}
}
// ThreadPool(WRITE): THREAD_POOL_META_STATE
void meta_duplication_service::do_restore_duplication_progress(
const duplication_info_s_ptr &dup, const std::shared_ptr<app_state> &app)
{
for (int partition_idx = 0; partition_idx < app->partition_count; partition_idx++) {
std::string str_pidx = std::to_string(partition_idx);
// <app_path>/duplication/<dup_id>/<partition_index>
std::string partition_path = get_partition_path(dup, str_pidx);
_meta_svc->get_meta_storage()->get_data(
std::move(partition_path), [dup, partition_idx](const blob &value) {
// value is confirmed_decree encoded in string.
if (value.size() == 0) {
// not found
dup->init_progress(partition_idx, invalid_decree);
return;
}
int64_t confirmed_decree = invalid_decree;
if (!buf2int64(value.to_string_view(), confirmed_decree)) {
LOG_ERROR("[{}] invalid confirmed_decree {} on partition_idx {}",
dup->log_prefix(),
value,
partition_idx);
return; // fail fast
}
dup->init_progress(partition_idx, confirmed_decree);
LOG_INFO(
"[{}] initialize progress from metastore [partition_idx: {}, confirmed: {}]",
dup->log_prefix(),
partition_idx,
confirmed_decree);
});
}
}
// ThreadPool(WRITE): THREAD_POOL_META_STATE
void meta_duplication_service::do_restore_duplication(dupid_t dup_id,
std::shared_ptr<app_state> app)
{
std::string store_path = get_duplication_path(*app, std::to_string(dup_id));
// restore duplication info from json
_meta_svc->get_meta_storage()->get_data(
std::string(store_path),
[ dup_id, this, app = std::move(app), store_path ](const blob &json) {
zauto_write_lock l(app_lock());
auto dup = duplication_info::decode_from_blob(dup_id,
app->app_id,
app->app_name,
app->partition_count,
app->max_replica_count,
store_path,
json);
if (nullptr == dup) {
LOG_ERROR("failed to decode json \"{}\" on path {}", json, store_path);
return; // fail fast
}
if (!dup->is_invalid_status()) {
app->duplications[dup->id] = dup;
refresh_duplicating_no_lock(app);
// restore progress
do_restore_duplication_progress(dup, app);
}
});
}
} // namespace replication
} // namespace dsn