blob: 33610fc6a4c667e62bcc7558a17a7b2199d6becc [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <fmt/core.h>
#include <algorithm>
#include <chrono>
#include <cstdint>
#include <functional>
#include <map>
#include <utility>
#include <vector>
#include "common/gpid.h"
#include "common/json_helper.h"
#include "common/replica_envs.h"
#include "common/replication.codes.h"
#include "common/replication_enums.h"
#include "common/replication_other_types.h"
#include "dsn.layer2_types.h"
#include "meta/meta_data.h"
#include "meta/meta_service.h"
#include "meta/meta_state_service.h"
#include "meta/server_state.h"
#include "meta/table_metrics.h"
#include "meta_admin_types.h"
#include "meta_split_service.h"
#include "meta_state_service_utils.h"
#include "metadata_types.h"
#include "runtime/rpc/rpc_address.h"
#include "runtime/rpc/rpc_holder.h"
#include "runtime/task/async_calls.h"
#include "utils/blob.h"
#include "utils/error_code.h"
#include "utils/fmt_logging.h"
#include "utils/zlocks.h"
namespace dsn {
namespace replication {
meta_split_service::meta_split_service(meta_service *meta_srv)
{
_meta_svc = meta_srv;
_state = meta_srv->get_server_state();
}
void meta_split_service::start_partition_split(start_split_rpc rpc)
{
const auto &request = rpc.request();
auto &response = rpc.response();
response.err = ERR_OK;
std::shared_ptr<app_state> app;
{
zauto_write_lock l(app_lock());
app = _state->get_app(request.app_name);
if (app == nullptr || app->status != app_status::AS_AVAILABLE) {
LOG_ERROR("app({}) is not existed or not available", request.app_name);
response.err = app == nullptr ? ERR_APP_NOT_EXIST : ERR_APP_DROPPED;
response.hint_msg = fmt::format(
"app {}", response.err == ERR_APP_NOT_EXIST ? "not existed" : "dropped");
return;
}
// new_partition_count != old_partition_count*2
if (request.new_partition_count != app->partition_count * 2) {
response.err = ERR_INVALID_PARAMETERS;
LOG_ERROR(
"wrong partition count: app({}), partition count({}), new_partition_count({})",
request.app_name,
app->partition_count,
request.new_partition_count);
response.hint_msg =
fmt::format("wrong partition_count, should be {}", app->partition_count * 2);
return;
}
if (app->splitting()) {
response.err = ERR_BUSY;
auto err_msg =
fmt::format("app({}) is already executing partition split", request.app_name);
LOG_ERROR("{}", err_msg);
response.hint_msg = err_msg;
return;
}
}
LOG_INFO("app({}) start to partition split, new_partition_count={}",
request.app_name,
request.new_partition_count);
do_start_partition_split(std::move(app), std::move(rpc));
}
void meta_split_service::do_start_partition_split(std::shared_ptr<app_state> app,
start_split_rpc rpc)
{
auto on_write_storage_complete = [app, rpc, this]() {
LOG_INFO("app({}) update partition count on remote storage, new partition_count = {}",
app->app_name,
app->partition_count * 2);
zauto_write_lock l(app_lock());
app->helpers->split_states.splitting_count = app->partition_count;
app->partition_count *= 2;
app->helpers->contexts.resize(app->partition_count);
app->partitions.resize(app->partition_count);
_state->get_table_metric_entities().resize_partitions(app->app_id, app->partition_count);
app->envs[replica_envs::SPLIT_VALIDATE_PARTITION_HASH] = "true";
for (int i = 0; i < app->partition_count; ++i) {
app->helpers->contexts[i].config_owner = &app->partitions[i];
if (i >= app->partition_count / 2) { // child partitions
app->partitions[i].ballot = invalid_ballot;
app->partitions[i].pid = gpid(app->app_id, i);
} else { // parent partitions
app->helpers->split_states.status[i] = split_status::SPLITTING;
}
}
auto &response = rpc.response();
response.err = ERR_OK;
};
if (app->init_partition_count <= 0) {
app->init_partition_count = app->partition_count;
}
auto copy = *app;
copy.partition_count *= 2;
copy.envs[replica_envs::SPLIT_VALIDATE_PARTITION_HASH] = "true";
blob value = dsn::json::json_forwarder<app_info>::encode(copy);
_meta_svc->get_meta_storage()->set_data(
_state->get_app_path(*app), std::move(value), on_write_storage_complete);
}
void meta_split_service::register_child_on_meta(register_child_rpc rpc)
{
const auto &request = rpc.request();
const std::string &app_name = request.app.app_name;
auto &response = rpc.response();
response.err = ERR_IO_PENDING;
zauto_write_lock l(app_lock());
std::shared_ptr<app_state> app = _state->get_app(app_name);
CHECK(app, "app({}) is not existed", app_name);
CHECK(app->is_stateful, "app({}) is stateless currently", app_name);
const gpid &parent_gpid = request.parent_config.pid;
const gpid &child_gpid = request.child_config.pid;
const auto &parent_config = app->partitions[parent_gpid.get_partition_index()];
if (request.parent_config.ballot != parent_config.ballot) {
LOG_ERROR("app({}) partition({}) register child({}) failed, request is outdated, request "
"parent ballot = {}, local parent ballot = {}",
app_name,
parent_gpid,
child_gpid,
request.parent_config.ballot,
parent_config.ballot);
response.err = ERR_INVALID_VERSION;
response.parent_config = parent_config;
return;
}
config_context &parent_context = app->helpers->contexts[parent_gpid.get_partition_index()];
if (parent_context.stage == config_status::pending_remote_sync) {
LOG_WARNING("app({}) partition({}): another request is syncing with remote storage, ignore "
"this request",
app_name,
parent_gpid);
return;
}
if (child_gpid.get_partition_index() >= app->partition_count) {
LOG_ERROR(
"app({}) partition({}) register child({}) failed, partition split has been canceled",
app_name,
parent_gpid,
child_gpid);
response.err = ERR_INVALID_STATE;
response.parent_config = parent_config;
return;
}
auto iter = app->helpers->split_states.status.find(parent_gpid.get_partition_index());
if (iter == app->helpers->split_states.status.end()) {
LOG_ERROR(
"duplicated register request, app({}) child partition({}) has already been registered",
app_name,
child_gpid);
const auto &child_config = app->partitions[child_gpid.get_partition_index()];
CHECK_GT_MSG(child_config.ballot,
0,
"app({}) partition({}) should have been registered",
app_name,
child_gpid);
response.err = ERR_CHILD_REGISTERED;
response.parent_config = parent_config;
return;
}
if (iter->second != split_status::SPLITTING) {
LOG_ERROR(
"app({}) partition({}) register child({}) failed, current partition split_status = {}",
app_name,
parent_gpid,
child_gpid,
dsn::enum_to_string(iter->second));
response.err = ERR_INVALID_STATE;
return;
}
app->helpers->split_states.status.erase(parent_gpid.get_partition_index());
app->helpers->split_states.splitting_count--;
LOG_INFO("app({}) parent({}) will register child({})", app_name, parent_gpid, child_gpid);
parent_context.stage = config_status::pending_remote_sync;
parent_context.msg = rpc.dsn_request();
parent_context.pending_sync_task = add_child_on_remote_storage(rpc, true);
}
dsn::task_ptr meta_split_service::add_child_on_remote_storage(register_child_rpc rpc,
bool create_new)
{
const auto &request = rpc.request();
const std::string &partition_path = _state->get_partition_path(request.child_config.pid);
blob value = dsn::json::json_forwarder<partition_configuration>::encode(request.child_config);
if (create_new) {
return _meta_svc->get_remote_storage()->create_node(
partition_path,
LPC_META_STATE_HIGH,
std::bind(&meta_split_service::on_add_child_on_remote_storage_reply,
this,
std::placeholders::_1,
rpc,
create_new),
value);
} else {
return _meta_svc->get_remote_storage()->set_data(
partition_path,
value,
LPC_META_STATE_HIGH,
std::bind(&meta_split_service::on_add_child_on_remote_storage_reply,
this,
std::placeholders::_1,
rpc,
create_new),
_meta_svc->tracker());
}
}
void meta_split_service::on_add_child_on_remote_storage_reply(error_code ec,
register_child_rpc rpc,
bool create_new)
{
const auto &request = rpc.request();
auto &response = rpc.response();
const std::string &app_name = request.app.app_name;
zauto_write_lock l(app_lock());
std::shared_ptr<app_state> app = _state->get_app(app_name);
CHECK(app, "app({}) is not existed", app_name);
CHECK(app->is_stateful, "app({}) is stateless currently", app_name);
const gpid &parent_gpid = request.parent_config.pid;
const gpid &child_gpid = request.child_config.pid;
config_context &parent_context = app->helpers->contexts[parent_gpid.get_partition_index()];
if (ec == ERR_TIMEOUT ||
(ec == ERR_NODE_ALREADY_EXIST && create_new)) { // retry register child on remote storage
bool retry_create_new = (ec == ERR_TIMEOUT) ? create_new : false;
int delay = (ec == ERR_TIMEOUT) ? 1 : 0;
parent_context.pending_sync_task =
tasking::enqueue(LPC_META_STATE_HIGH,
nullptr,
[this, parent_context, rpc, retry_create_new]() mutable {
parent_context.pending_sync_task =
add_child_on_remote_storage(rpc, retry_create_new);
},
0,
std::chrono::seconds(delay));
return;
}
CHECK_EQ_MSG(ec, ERR_OK, "we can't handle this right now");
LOG_INFO("parent({}) resgiter child({}) on remote storage succeed", parent_gpid, child_gpid);
// update local child partition configuration
std::shared_ptr<configuration_update_request> update_child_request =
std::make_shared<configuration_update_request>();
update_child_request->config = request.child_config;
update_child_request->info = *app;
update_child_request->type = config_type::CT_REGISTER_CHILD;
update_child_request->node = request.primary_address;
partition_configuration child_config = app->partitions[child_gpid.get_partition_index()];
child_config.secondaries = request.child_config.secondaries;
_state->update_configuration_locally(*app, update_child_request);
if (parent_context.msg) {
response.err = ERR_OK;
response.app = *app;
response.parent_config = app->partitions[parent_gpid.get_partition_index()];
response.child_config = app->partitions[child_gpid.get_partition_index()];
parent_context.msg = nullptr;
}
parent_context.pending_sync_task = nullptr;
parent_context.stage = config_status::not_pending;
}
void meta_split_service::query_partition_split(query_split_rpc rpc) const
{
const std::string &app_name = rpc.request().app_name;
auto &response = rpc.response();
response.err = ERR_OK;
zauto_read_lock l(app_lock());
std::shared_ptr<app_state> app = _state->get_app(app_name);
if (app == nullptr || app->status != app_status::AS_AVAILABLE) {
response.err = app == nullptr ? ERR_APP_NOT_EXIST : ERR_APP_DROPPED;
response.__set_hint_msg(fmt::format(
"app({}) {}", app_name, response.err == ERR_APP_NOT_EXIST ? "not existed" : "dropped"));
LOG_ERROR("query partition split failed, {}", response.hint_msg);
return;
}
if (!app->splitting()) {
response.err = ERR_INVALID_STATE;
response.__set_hint_msg(fmt::format("app({}) is not splitting", app_name));
LOG_ERROR("query partition split failed, {}", response.hint_msg);
return;
}
response.new_partition_count = app->partition_count;
response.status = app->helpers->split_states.status;
LOG_INFO("query partition split succeed, app({}), partition_count({}), splitting_count({})",
app->app_name,
response.new_partition_count,
response.status.size());
}
void meta_split_service::control_partition_split(control_split_rpc rpc)
{
const auto &req = rpc.request();
const auto &control_type = req.control_type;
auto &response = rpc.response();
zauto_write_lock l(app_lock());
std::shared_ptr<app_state> app = _state->get_app(req.app_name);
if (app == nullptr || app->status != app_status::AS_AVAILABLE) {
response.err = app == nullptr ? ERR_APP_NOT_EXIST : ERR_APP_DROPPED;
response.__set_hint_msg(fmt::format(
"app {}", response.err == ERR_APP_NOT_EXIST ? "not existed" : "dropped", req.app_name));
LOG_ERROR("{} split failed, {}", control_type_str(control_type), response.hint_msg);
return;
}
if (!app->splitting()) {
response.err = ERR_INVALID_STATE;
response.__set_hint_msg(fmt::format("app({}) is not splitting", req.app_name));
LOG_ERROR("{} split failed, {}", control_type_str(control_type), response.hint_msg);
return;
}
if (req.parent_pidx >= 0 && (control_type == split_control_type::PAUSE ||
control_type == split_control_type::RESTART)) {
do_control_single(std::move(app), std::move(rpc));
} else {
do_control_all(std::move(app), std::move(rpc));
}
}
void meta_split_service::do_control_single(std::shared_ptr<app_state> app, control_split_rpc rpc)
{
const auto &req = rpc.request();
const std::string &app_name = req.app_name;
const int32_t &parent_pidx = req.parent_pidx;
const auto &control_type = req.control_type;
auto &response = rpc.response();
if (parent_pidx >= app->partition_count / 2) {
response.err = ERR_INVALID_PARAMETERS;
response.__set_hint_msg(fmt::format("invalid parent partition index({})", parent_pidx));
LOG_ERROR("{} split for app({}) failed, {}",
control_type_str(control_type),
app_name,
response.hint_msg);
return;
}
auto iter = app->helpers->split_states.status.find(parent_pidx);
if (iter == app->helpers->split_states.status.end()) {
response.err =
control_type == split_control_type::PAUSE ? ERR_CHILD_REGISTERED : ERR_INVALID_STATE;
response.__set_hint_msg(fmt::format("partition[{}] is not splitting", parent_pidx));
LOG_ERROR("{} split for app({}) failed, {}",
control_type_str(control_type),
app_name,
response.hint_msg);
return;
}
split_status::type old_status =
control_type == split_control_type::PAUSE ? split_status::SPLITTING : split_status::PAUSED;
split_status::type target_status =
control_type == split_control_type::PAUSE ? split_status::PAUSING : split_status::SPLITTING;
if (iter->second == old_status) {
iter->second = target_status;
response.err = ERR_OK;
LOG_INFO("app({}) partition[{}] {} split succeed",
app_name,
parent_pidx,
control_type_str(control_type));
} else {
response.err = ERR_INVALID_STATE;
response.__set_hint_msg(fmt::format("partition[{}] wrong split_status({})",
parent_pidx,
dsn::enum_to_string(iter->second)));
LOG_ERROR("{} split for app({}) failed, {}",
control_type_str(control_type),
app_name,
response.hint_msg);
}
}
void meta_split_service::do_control_all(std::shared_ptr<app_state> app, control_split_rpc rpc)
{
const auto &req = rpc.request();
const auto &control_type = req.control_type;
auto &response = rpc.response();
if (control_type == split_control_type::CANCEL) {
if (req.old_partition_count != app->partition_count / 2) {
response.err = ERR_INVALID_PARAMETERS;
response.__set_hint_msg(
fmt::format("wrong partition_count, should be {}", app->partition_count / 2));
LOG_ERROR("cancel split for app({}) failed, wrong partition count: partition count({}) "
"VS req partition_count({})",
app->app_name,
app->partition_count,
req.old_partition_count);
return;
}
if (app->helpers->split_states.splitting_count != req.old_partition_count) {
response.err = ERR_CHILD_REGISTERED;
response.__set_hint_msg("some partitions have already finished split");
LOG_ERROR("cancel split for app({}) failed, {}", app->app_name, response.hint_msg);
return;
}
for (auto &kv : app->helpers->split_states.status) {
LOG_INFO("app({}) partition({}) cancel split, old status = {}",
app->app_name,
kv.first,
dsn::enum_to_string(kv.second));
kv.second = split_status::CANCELING;
}
return;
}
split_status::type old_status =
control_type == split_control_type::PAUSE ? split_status::SPLITTING : split_status::PAUSED;
split_status::type target_status =
control_type == split_control_type::PAUSE ? split_status::PAUSING : split_status::SPLITTING;
for (auto &kv : app->helpers->split_states.status) {
if (kv.second == old_status) {
kv.second = target_status;
LOG_INFO("app({}) partition[{}] {} split succeed",
app->app_name,
kv.first,
control_type_str(control_type));
}
}
response.err = ERR_OK;
}
void meta_split_service::notify_stop_split(notify_stop_split_rpc rpc)
{
const auto &request = rpc.request();
auto &response = rpc.response();
zauto_write_lock l(app_lock());
std::shared_ptr<app_state> app = _state->get_app(request.app_name);
CHECK(app, "app({}) is not existed", request.app_name);
CHECK(app->is_stateful, "app({}) is stateless currently", request.app_name);
CHECK(request.meta_split_status == split_status::PAUSING ||
request.meta_split_status == split_status::CANCELING,
"invalid split_status({})",
dsn::enum_to_string(request.meta_split_status));
const std::string &stop_type =
rpc.request().meta_split_status == split_status::PAUSING ? "pause" : "cancel";
const auto iter =
app->helpers->split_states.status.find(request.parent_gpid.get_partition_index());
if (iter == app->helpers->split_states.status.end()) {
LOG_WARNING(
"app({}) partition({}) is not executing partition split, ignore out-dated {} split "
"request",
app->app_name,
request.parent_gpid,
stop_type);
response.err = ERR_INVALID_VERSION;
return;
}
if (iter->second != request.meta_split_status) {
LOG_WARNING("app({}) partition({}) split_status = {}, ignore out-dated {} split request",
app->app_name,
request.parent_gpid,
dsn::enum_to_string(iter->second),
stop_type);
response.err = ERR_INVALID_VERSION;
return;
}
LOG_INFO("app({}) partition({}) notify {} split succeed",
app->app_name,
request.parent_gpid,
stop_type);
// pausing split
if (iter->second == split_status::PAUSING) {
iter->second = split_status::PAUSED;
response.err = ERR_OK;
return;
}
// canceling split
CHECK_EQ_MSG(request.partition_count * 2, app->partition_count, "wrong partition_count");
app->helpers->split_states.status.erase(request.parent_gpid.get_partition_index());
response.err = ERR_OK;
// when all partitions finish, partition_count should be updated
if (--app->helpers->split_states.splitting_count == 0) {
do_cancel_partition_split(std::move(app), rpc);
}
}
void meta_split_service::do_cancel_partition_split(std::shared_ptr<app_state> app,
notify_stop_split_rpc rpc)
{
auto on_write_storage_complete = [app, rpc, this]() {
LOG_INFO("app({}) update partition count on remote storage, new partition count is {}",
app->app_name,
app->partition_count / 2);
zauto_write_lock l(app_lock());
app->partition_count /= 2;
app->helpers->contexts.resize(app->partition_count);
app->partitions.resize(app->partition_count);
_state->get_table_metric_entities().resize_partitions(app->app_id, app->partition_count);
};
auto copy = *app;
copy.partition_count = rpc.request().partition_count;
blob value = dsn::json::json_forwarder<app_info>::encode(copy);
_meta_svc->get_meta_storage()->set_data(
_state->get_app_path(*app), std::move(value), on_write_storage_complete);
}
void meta_split_service::query_child_state(query_child_state_rpc rpc)
{
const auto &request = rpc.request();
const auto &app_name = request.app_name;
const auto &parent_pid = request.pid;
auto &response = rpc.response();
zauto_read_lock l(app_lock());
std::shared_ptr<app_state> app = _state->get_app(app_name);
CHECK(app, "app({}) is not existed", app_name);
CHECK(app->is_stateful, "app({}) is stateless currently", app_name);
if (app->partition_count == request.partition_count) {
response.err = ERR_INVALID_STATE;
LOG_ERROR("app({}) is not executing partition split", app_name);
return;
}
CHECK_EQ_MSG(app->partition_count,
request.partition_count * 2,
"app({}) has invalid partition_count",
app_name);
auto child_pidx = parent_pid.get_partition_index() + request.partition_count;
if (app->partitions[child_pidx].ballot == invalid_ballot) {
response.err = ERR_INVALID_STATE;
LOG_ERROR("app({}) parent partition({}) split has been canceled", app_name, parent_pid);
return;
}
LOG_INFO(
"app({}) child partition({}.{}) is ready", app_name, parent_pid.get_app_id(), child_pidx);
response.err = ERR_OK;
response.__set_partition_count(app->partition_count);
response.__set_child_config(app->partitions[child_pidx]);
}
} // namespace replication
} // namespace dsn