blob: a9e7edaed4667f4699ed7b74808717e449759e16 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <boost/filesystem/path.hpp>
#include <boost/system/error_code.hpp>
#include <fcntl.h>
#include <fmt/core.h>
// IWYU pragma: no_include <gtest/gtest-message.h>
// IWYU pragma: no_include <gtest/gtest-test-part.h>
#include <gtest/gtest.h>
#include <sys/types.h>
#include <unistd.h>
#include "common/gpid.h"
#include "common/replication.codes.h"
#include "common/replication_other_types.h"
#include "consensus_types.h"
#include "duplication_types.h"
#include "perf_counter/perf_counter.h"
#include "perf_counter/perf_counter_wrapper.h"
#include "replica/duplication/mutation_duplicator.h"
#include "replica/duplication/replica_duplicator.h"
#include "replica/log_file.h"
#include "replica/mutation.h"
#include "replica/mutation_log.h"
#include "replica/test/mock_utils.h"
#include "runtime/pipeline.h"
#include "runtime/rpc/rpc_holder.h"
#include "runtime/task/task_code.h"
#include "runtime/task/task_tracker.h"
#include "utils/autoref_ptr.h"
#include "utils/chrono_literals.h"
#include "utils/error_code.h"
#include "utils/errors.h"
#include "utils/fail_point.h"
#include "utils/filesystem.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"
#define BOOST_NO_CXX11_SCOPED_ENUMS
#include <boost/filesystem/operations.hpp>
#include <chrono>
#include <functional>
#include <iterator>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#undef BOOST_NO_CXX11_SCOPED_ENUMS
#include "duplication_test_base.h"
#include "replica/duplication/load_from_private_log.h"
#include "replica/mutation_log_utils.h"
namespace dsn {
namespace replication {
DEFINE_STORAGE_WRITE_RPC_CODE(RPC_RRDB_RRDB_PUT, ALLOW_BATCH, IS_IDEMPOTENT)
class load_from_private_log_test : public duplication_test_base
{
public:
load_from_private_log_test()
{
_replica->init_private_log(_log_dir);
duplicator = create_test_duplicator();
}
// return number of entries written
int generate_multiple_log_files(uint files_num = 3)
{
// decree ranges from [1, files_num*10)
for (int f = 0; f < files_num; f++) {
// each round mlog will replay the former logs, and create new file
mutation_log_ptr mlog = create_private_log();
for (int i = 1; i <= 10; i++) {
std::string msg = "hello!";
mutation_ptr mu = create_test_mutation(10 * f + i, msg);
mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, nullptr, nullptr, 0);
}
mlog->tracker()->wait_outstanding_tasks();
mlog->close();
}
return static_cast<int>(files_num * 10);
}
void test_find_log_file_to_start()
{
load_from_private_log load(_replica.get(), duplicator.get());
std::vector<std::string> mutations;
int max_log_file_mb = 1;
mutation_log_ptr mlog = new mutation_log_private(
_replica->dir(), max_log_file_mb, _replica->get_gpid(), _replica.get());
EXPECT_EQ(mlog->open(nullptr, nullptr), ERR_OK);
load.find_log_file_to_start({});
ASSERT_FALSE(load._current);
int num_entries = generate_multiple_log_files(3);
auto files = open_log_file_map(_log_dir);
load.set_start_decree(1);
load.find_log_file_to_start(files);
ASSERT_TRUE(load._current);
ASSERT_EQ(load._current->index(), 1);
load._current = nullptr;
load.set_start_decree(5);
load.find_log_file_to_start(files);
ASSERT_TRUE(load._current);
ASSERT_EQ(load._current->index(), 1);
int last_idx = files.rbegin()->first;
load._current = nullptr;
load.set_start_decree(num_entries + 200);
load.find_log_file_to_start(files);
ASSERT_TRUE(load._current);
ASSERT_EQ(load._current->index(), last_idx);
}
void test_start_duplication(int num_entries, int private_log_size_mb)
{
mutation_log_ptr mlog = create_private_log(private_log_size_mb, _replica->get_gpid());
int last_commit_decree_start = 5;
int decree_start = 10;
{
DSN_DECLARE_bool(plog_force_flush);
auto reserved_plog_force_flush = FLAGS_plog_force_flush;
FLAGS_plog_force_flush = true;
for (int i = decree_start; i <= num_entries + decree_start; i++) {
std::string msg = "hello!";
// decree - last_commit_decree = 1 by default
mutation_ptr mu = create_test_mutation(i, msg);
// mock the last_commit_decree of first mu equal with `last_commit_decree_start`
if (i == decree_start) {
mu->data.header.last_committed_decree = last_commit_decree_start;
}
mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, nullptr, nullptr, 0);
}
// commit the last entry
mutation_ptr mu = create_test_mutation(decree_start + num_entries + 1, "hello!");
mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, nullptr, nullptr, 0);
FLAGS_plog_force_flush = reserved_plog_force_flush;
mlog->close();
}
load_and_wait_all_entries_loaded(num_entries, num_entries, decree_start);
}
mutation_tuple_set
load_and_wait_all_entries_loaded(int total, int last_decree, decree start_decree)
{
return load_and_wait_all_entries_loaded(
total, last_decree, _replica->get_gpid(), start_decree, -1);
}
mutation_tuple_set load_and_wait_all_entries_loaded(int total, int last_decree)
{
return load_and_wait_all_entries_loaded(total, last_decree, _replica->get_gpid(), 0, -1);
}
mutation_tuple_set load_and_wait_all_entries_loaded(
int total, int last_decree, gpid id, decree start_decree, decree confirmed_decree)
{
mutation_log_ptr mlog = create_private_log(id);
for (const auto &pr : mlog->get_log_file_map()) {
EXPECT_TRUE(pr.second->file_handle() == nullptr);
}
_replica->init_private_log(mlog);
duplicator = create_test_duplicator(confirmed_decree);
load_from_private_log load(_replica.get(), duplicator.get());
const_cast<std::chrono::milliseconds &>(load._repeat_delay) = 1_s;
load.set_start_decree(start_decree);
mutation_tuple_set loaded_mutations;
pipeline::do_when<decree, mutation_tuple_set> end_stage(
[&loaded_mutations, &load, total, last_decree](decree &&d,
mutation_tuple_set &&mutations) {
// we create one mutation_update per mutation
// the mutations are started from 1
for (mutation_tuple mut : mutations) {
loaded_mutations.emplace(mut);
}
if (loaded_mutations.size() < total || d < last_decree) {
load.run();
}
});
duplicator->from(load).link(end_stage);
// inject some faults
fail::setup();
fail::cfg("open_read", "25%1*return()");
fail::cfg("mutation_log_read_log_block", "25%1*return()");
fail::cfg("duplication_sync_complete", "void()");
duplicator->run_pipeline();
duplicator->wait_all();
fail::teardown();
return loaded_mutations;
}
void test_restart_duplication()
{
load_from_private_log load(_replica.get(), duplicator.get());
generate_multiple_log_files(2);
std::vector<std::string> files;
ASSERT_EQ(log_utils::list_all_files(_log_dir, files), error_s::ok());
ASSERT_EQ(files.size(), 2);
boost::filesystem::remove(_log_dir + "/log.1.0");
mutation_log_ptr mlog = create_private_log();
decree max_gced_dercee = mlog->max_gced_decree_no_lock(_replica->get_gpid());
// new duplication, start_decree = max_gced_decree + 1
// ensure we can find the first file.
load.set_start_decree(max_gced_dercee + 1);
load.find_log_file_to_start(mlog->get_log_file_map());
ASSERT_TRUE(load._current);
ASSERT_EQ(load._current->index(), 2);
}
mutation_log_ptr create_private_log(gpid id) { return create_private_log(1, id); }
mutation_log_ptr create_private_log(int private_log_size_mb = 1, gpid id = gpid(1, 1))
{
std::map<gpid, decree> replay_condition;
replay_condition[id] = 0; // duplicating
mutation_log::replay_callback cb = [](int, mutation_ptr &) { return true; };
mutation_log_ptr mlog;
int try_cnt = 0;
while (try_cnt < 5) {
try_cnt++;
mlog =
new mutation_log_private(_replica->dir(), private_log_size_mb, id, _replica.get());
error_code err = mlog->open(cb, nullptr, replay_condition);
if (err == ERR_OK) {
break;
}
LOG_ERROR("mlog open failed, encountered error: {}", err);
}
return mlog;
}
std::unique_ptr<replica_duplicator> duplicator;
};
TEST_F(load_from_private_log_test, find_log_file_to_start) { test_find_log_file_to_start(); }
TEST_F(load_from_private_log_test, start_duplication_10000_4MB)
{
test_start_duplication(10000, 4);
}
TEST_F(load_from_private_log_test, start_duplication_50000_4MB)
{
test_start_duplication(50000, 4);
}
TEST_F(load_from_private_log_test, start_duplication_10000_1MB)
{
test_start_duplication(10000, 1);
}
TEST_F(load_from_private_log_test, start_duplication_50000_1MB)
{
test_start_duplication(50000, 1);
}
TEST_F(load_from_private_log_test, start_duplication_100000_4MB)
{
test_start_duplication(100000, 4);
}
// Ensure replica_duplicator can correctly handle real-world log file
TEST_F(load_from_private_log_test, handle_real_private_log)
{
struct test_data
{
std::string fname;
int puts;
int total;
gpid id;
} tests[] = {
// PUT, PUT, PUT, EMPTY, PUT, EMPTY, EMPTY
{"log.1.0.handle_real_private_log", 4, 6, gpid(1, 4)},
// EMPTY, PUT, EMPTY
{"log.1.0.handle_real_private_log2", 1, 2, gpid(1, 4)},
// EMPTY, EMPTY, EMPTY
{"log.1.0.all_loaded_are_write_empties", 0, 2, gpid(1, 5)},
};
for (auto tt : tests) {
// reset replica to specified gpid
duplicator.reset(nullptr);
_replica = create_mock_replica(stub.get(), tt.id.get_app_id(), tt.id.get_partition_index());
// Update '_log_dir' to the corresponding replica created above.
_log_dir = _replica->dir();
// Copy the log file to '_log_dir'
boost::filesystem::path file(tt.fname);
boost::system::error_code ec;
boost::filesystem::copy_file(
file, _log_dir + "/log.1.0", boost::filesystem::copy_option::overwrite_if_exists, ec);
ASSERT_TRUE(!ec);
// Start to verify.
load_and_wait_all_entries_loaded(tt.puts, tt.total, tt.id, 1, 0);
}
}
TEST_F(load_from_private_log_test, restart_duplication) { test_restart_duplication(); }
TEST_F(load_from_private_log_test, ignore_useless)
{
utils::filesystem::remove_path(_log_dir);
mutation_log_ptr mlog = create_private_log();
int num_entries = 100;
for (int i = 1; i <= num_entries; i++) {
std::string msg = "hello!";
mutation_ptr mu = create_test_mutation(i, msg);
mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, nullptr, nullptr, 0);
}
// commit the last entry
mutation_ptr mu = create_test_mutation(1 + num_entries, "hello!");
mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, nullptr, nullptr, 0);
mlog->close();
// starts from 51
mutation_tuple_set result =
load_and_wait_all_entries_loaded(50, 100, _replica->get_gpid(), 51, 0);
ASSERT_EQ(result.size(), 50);
// starts from 100
result = load_and_wait_all_entries_loaded(1, 100, _replica->get_gpid(), 100, 0);
ASSERT_EQ(result.size(), 1);
// a new duplication's confirmed_decree is invalid_decree,
// so start_decree is 0.
// In this case duplication will starts from last_commit(100) + 1,
// no mutation will be loaded.
result = load_and_wait_all_entries_loaded(0, 100, _replica->get_gpid(), 101, -1);
ASSERT_EQ(result.size(), 0);
}
class load_fail_mode_test : public load_from_private_log_test
{
public:
void SetUp() override
{
const int num_entries = generate_multiple_log_files();
// prepare loading pipeline
mlog = create_private_log();
_replica->init_private_log(mlog);
duplicator = create_test_duplicator(1);
load = std::make_unique<load_from_private_log>(_replica.get(), duplicator.get());
load->TEST_set_repeat_delay(0_ms); // no delay
load->set_start_decree(duplicator->progress().last_decree + 1);
end_stage = std::make_unique<end_stage_t>(
[this, num_entries](decree &&d, mutation_tuple_set &&mutations) {
load->set_start_decree(d + 1);
if (d < num_entries - 1) {
load->run();
}
});
duplicator->from(*load).link(*end_stage);
}
mutation_log_ptr mlog;
std::unique_ptr<load_from_private_log> load;
using end_stage_t = pipeline::do_when<decree, mutation_tuple_set>;
std::unique_ptr<end_stage_t> end_stage;
};
TEST_F(load_fail_mode_test, fail_skip)
{
duplicator->update_fail_mode(duplication_fail_mode::FAIL_SKIP);
ASSERT_EQ(load->_counter_dup_load_skipped_bytes_count->get_integer_value(), 0);
// will trigger fail-skip and read the subsequent file, some mutations will be lost.
auto repeats = load->MAX_ALLOWED_BLOCK_REPEATS * load->MAX_ALLOWED_FILE_REPEATS;
fail::setup();
fail::cfg("mutation_log_replay_block", fmt::format("100%{}*return()", repeats));
duplicator->run_pipeline();
duplicator->wait_all();
fail::teardown();
ASSERT_EQ(load->_counter_dup_load_file_failed_count->get_integer_value(),
load_from_private_log::MAX_ALLOWED_FILE_REPEATS);
ASSERT_GT(load->_counter_dup_load_skipped_bytes_count->get_integer_value(), 0);
}
TEST_F(load_fail_mode_test, fail_slow)
{
duplicator->update_fail_mode(duplication_fail_mode::FAIL_SLOW);
ASSERT_EQ(load->_counter_dup_load_skipped_bytes_count->get_integer_value(), 0);
ASSERT_EQ(load->_counter_dup_load_file_failed_count->get_integer_value(), 0);
// will trigger fail-slow and retry infinitely
auto repeats = load->MAX_ALLOWED_BLOCK_REPEATS * load->MAX_ALLOWED_FILE_REPEATS;
fail::setup();
fail::cfg("mutation_log_replay_block", fmt::format("100%{}*return()", repeats));
duplicator->run_pipeline();
duplicator->wait_all();
fail::teardown();
ASSERT_EQ(load->_counter_dup_load_file_failed_count->get_integer_value(),
load_from_private_log::MAX_ALLOWED_FILE_REPEATS);
ASSERT_EQ(load->_counter_dup_load_skipped_bytes_count->get_integer_value(), 0);
}
TEST_F(load_fail_mode_test, fail_skip_real_corrupted_file)
{
{ // inject some bad data in the middle of the first file
std::string log_path = _log_dir + "/log.1.0";
auto file_size = boost::filesystem::file_size(log_path);
int fd = open(log_path.c_str(), O_WRONLY);
const char buf[] = "xxxxxx";
auto written_size = pwrite(fd, buf, sizeof(buf), file_size / 2);
ASSERT_EQ(written_size, sizeof(buf));
close(fd);
}
duplicator->update_fail_mode(duplication_fail_mode::FAIL_SKIP);
duplicator->run_pipeline();
duplicator->wait_all();
// ensure the bad file will be skipped
ASSERT_EQ(load->_counter_dup_load_file_failed_count->get_integer_value(),
load_from_private_log::MAX_ALLOWED_FILE_REPEATS);
ASSERT_GT(load->_counter_dup_load_skipped_bytes_count->get_integer_value(), 0);
}
} // namespace replication
} // namespace dsn