blob: 7f9a295361c0a24a2b6374ee15c881a8d8b336c3 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "duplication_info.h"
#include "common/duplication_common.h"
#include "meta/meta_data.h"
#include "runtime/api_layer1.h"
#include "utils/fmt_logging.h"
namespace dsn {
namespace replication {
/*extern*/ void json_encode(dsn::json::JsonWriter &out, const duplication_status::type &s)
{
json::json_encode(out, duplication_status_to_string(s));
}
/*extern*/ bool json_decode(const dsn::json::JsonObject &in, duplication_status::type &s)
{
static const std::map<std::string, duplication_status::type>
_duplication_status_NAMES_TO_VALUES = {
{"DS_INIT", duplication_status::DS_INIT},
{"DS_PREPARE", duplication_status::DS_PREPARE},
{"DS_APP", duplication_status::DS_APP},
{"DS_LOG", duplication_status::DS_LOG},
{"DS_PAUSE", duplication_status::DS_PAUSE},
{"DS_REMOVED", duplication_status::DS_REMOVED},
};
std::string name;
json::json_decode(in, name);
auto it = _duplication_status_NAMES_TO_VALUES.find(name);
if (it != _duplication_status_NAMES_TO_VALUES.end()) {
s = it->second;
return true;
}
LOG_ERROR("unexpected duplication_status name: {}", name);
// for forward compatibility issue, duplication of unexpected status
// will be marked as invisible.
s = duplication_status::DS_REMOVED;
return false;
}
/*extern*/ void json_encode(dsn::json::JsonWriter &out, const duplication_fail_mode::type &fmode)
{
json::json_encode(out, duplication_fail_mode_to_string(fmode));
}
/*extern*/ bool json_decode(const dsn::json::JsonObject &in, duplication_fail_mode::type &fmode)
{
static const std::map<std::string, duplication_fail_mode::type>
_duplication_fail_mode_NAMES_TO_VALUES = {
{"FAIL_SLOW", duplication_fail_mode::FAIL_SLOW},
{"FAIL_SKIP", duplication_fail_mode::FAIL_SKIP},
{"FAIL_FAST", duplication_fail_mode::FAIL_FAST},
};
std::string name;
json::json_decode(in, name);
auto it = _duplication_fail_mode_NAMES_TO_VALUES.find(name);
if (it != _duplication_fail_mode_NAMES_TO_VALUES.end()) {
fmode = it->second;
return true;
}
LOG_ERROR("unexpected duplication_fail_mode name: {}", name);
// marked as default value.
fmode = duplication_fail_mode::FAIL_SLOW;
return false;
}
// lock held
error_code duplication_info::alter_status(duplication_status::type to_status,
duplication_fail_mode::type to_fail_mode)
{
if (_is_altering) {
return ERR_BUSY;
}
if (_status == duplication_status::DS_REMOVED) {
return ERR_OBJECT_NOT_FOUND;
}
if (!is_valid_alteration(to_status)) {
return ERR_INVALID_PARAMETERS;
}
if (_status == to_status && _fail_mode == to_fail_mode) {
return ERR_OK;
}
zauto_write_lock l(_lock);
_is_altering = true;
_next_status = to_status;
_next_fail_mode = to_fail_mode;
return ERR_OK;
}
void duplication_info::init_progress(int partition_index, decree d)
{
zauto_write_lock l(_lock);
auto &p = _progress[partition_index];
p.volatile_decree = p.stored_decree = d;
p.is_inited = true;
}
bool duplication_info::alter_progress(int partition_index,
const duplication_confirm_entry &confirm_entry)
{
zauto_write_lock l(_lock);
partition_progress &p = _progress[partition_index];
if (!p.is_inited) {
return false;
}
if (p.is_altering) {
return false;
}
p.checkpoint_prepared = confirm_entry.checkpoint_prepared;
if (p.volatile_decree < confirm_entry.confirmed_decree) {
p.volatile_decree = confirm_entry.confirmed_decree;
}
if (p.volatile_decree != p.stored_decree) {
// progress update is not supposed to be too frequent.
if (dsn_now_ms() > p.last_progress_update_ms + PROGRESS_UPDATE_PERIOD_MS) {
p.is_altering = true;
p.last_progress_update_ms = dsn_now_ms();
return true;
}
}
return false;
}
void duplication_info::persist_progress(int partition_index)
{
zauto_write_lock l(_lock);
auto &p = _progress[partition_index];
CHECK_PREFIX_MSG(p.is_altering, "partition_index: {}", partition_index);
p.is_altering = false;
p.stored_decree = p.volatile_decree;
}
void duplication_info::persist_status()
{
zauto_write_lock l(_lock);
if (!_is_altering) {
LOG_ERROR_PREFIX("callers never write a duplication that is not altering to meta store");
return;
}
LOG_INFO_PREFIX("change duplication status from {} to {} successfully [app_id: {}]",
duplication_status_to_string(_status),
duplication_status_to_string(_next_status),
app_id);
_is_altering = false;
_status = _next_status;
_next_status = duplication_status::DS_INIT;
_fail_mode = _next_fail_mode;
}
std::string duplication_info::to_string() const
{
return duplication_entry_to_string(to_duplication_entry());
}
blob duplication_info::to_json_blob() const
{
json_helper copy;
copy.create_timestamp_ms = create_timestamp_ms;
copy.remote = follower_cluster_name;
copy.status = _next_status;
copy.fail_mode = _next_fail_mode;
return json::json_forwarder<json_helper>::encode(copy);
}
void duplication_info::report_progress_if_time_up()
{
// progress report is not supposed to be too frequent.
if (dsn_now_ms() > _last_progress_report_ms + PROGRESS_REPORT_PERIOD_MS) {
_last_progress_report_ms = dsn_now_ms();
LOG_INFO("duplication report: {}", to_string());
}
}
duplication_info_s_ptr duplication_info::decode_from_blob(dupid_t dup_id,
int32_t app_id,
const std::string &app_name,
int32_t partition_count,
std::string store_path,
const blob &json)
{
json_helper info;
if (!json::json_forwarder<json_helper>::decode(json, info)) {
return nullptr;
}
std::vector<rpc_address> meta_list;
if (!dsn::replication::replica_helper::load_meta_servers(
meta_list, duplication_constants::kClustersSectionName.c_str(), info.remote.c_str())) {
return nullptr;
}
auto dup = std::make_shared<duplication_info>(dup_id,
app_id,
app_name,
partition_count,
info.create_timestamp_ms,
std::move(info.remote),
std::move(meta_list),
std::move(store_path));
dup->_status = info.status;
dup->_fail_mode = info.fail_mode;
return dup;
}
void duplication_info::append_if_valid_for_query(
const app_state &app,
/*out*/ std::vector<duplication_entry> &entry_list) const
{
zauto_read_lock l(_lock);
entry_list.emplace_back(to_duplication_entry());
duplication_entry &ent = entry_list.back();
// the confirmed decree is not useful for displaying
// the overall state of duplication
ent.__isset.progress = false;
}
} // namespace replication
} // namespace dsn