blob: 3cbcddc622fedeeb6f5c6e2c905914fee8b439d0 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <cstdint>
#include <functional>
#include <memory>
#include <ostream>
#include <string>
#include <tuple>
#include <type_traits>
#include <unordered_map>
#include <utility>
#include <vector>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "kudu/client/client.h"
#include "kudu/client/schema.h"
#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
#include "kudu/client/write_op.h"
#include "kudu/common/partial_row.h"
#include "kudu/common/wire_protocol-test-util.h"
#include "kudu/fs/block_manager.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/integration-tests/cluster_itest_util.h"
#include "kudu/integration-tests/cluster_verifier.h"
#include "kudu/integration-tests/external_mini_cluster-itest-base.h"
#include "kudu/integration-tests/test_workload.h"
#include "kudu/mini-cluster/external_mini_cluster.h"
#include "kudu/tablet/tablet.pb.h"
#include "kudu/tserver/tserver.pb.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/path_util.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
METRIC_DECLARE_entity(server);
METRIC_DECLARE_entity(tablet);
METRIC_DECLARE_gauge_int32(num_raft_leaders);
METRIC_DECLARE_gauge_size(num_rowsets_on_disk);
METRIC_DECLARE_gauge_uint64(data_dirs_failed);
METRIC_DECLARE_gauge_uint32(tablets_num_failed);
METRIC_DECLARE_gauge_uint64(wal_dir_space_available_bytes);
METRIC_DECLARE_gauge_uint64(data_dirs_space_available_bytes);
using kudu::client::sp::shared_ptr;
using kudu::client::KuduClient;
using kudu::client::KuduDelete;
using kudu::client::KuduInsert;
using kudu::client::KuduSession;
using kudu::client::KuduTable;
using kudu::cluster::ExternalDaemon;
using kudu::cluster::ExternalMiniClusterOptions;
using kudu::cluster::ExternalTabletServer;
using kudu::fs::BlockManager;
using kudu::KuduPartialRow;
using std::pair;
using std::string;
using std::unique_ptr;
using std::vector;
using strings::Substitute;
namespace kudu {
const MonoDelta kAgreementTimeout = MonoDelta::FromSeconds(30);
class DiskFailureITest : public ExternalMiniClusterITestBase,
public ::testing::WithParamInterface<std::tuple<string, bool>> {
public:
// Waits for 'ext_tserver' to experience 'target_failed_disks' disk failures.
void WaitForDiskFailures(const ExternalTabletServer* ext_tserver,
int64_t target_failed_disks = 1) const {
ASSERT_EVENTUALLY([&] {
int64_t failed_on_ts;
ASSERT_OK(itest::GetInt64Metric(ext_tserver->bound_http_hostport(),
&METRIC_ENTITY_server, nullptr, &METRIC_data_dirs_failed, "value", &failed_on_ts));
ASSERT_EQ(target_failed_disks, failed_on_ts);
});
}
};
// Test ensuring that tablet server can be started with failed directories. A
// cluster is started and loaded with some tablets. The tablet server is then
// shut down and restarted. Errors are injected to one of the directories while
// it is shut down.
TEST_P(DiskFailureITest, TestFailDuringServerStartup) {
const string block_manager_type = std::get<0>(GetParam());
const bool is_3_4_3_mode = std::get<1>(GetParam());
// Set up a cluster with five disks at each tablet server. In case of 3-4-3
// replication scheme one more tablet server is needed to put the replacement
// non-voter replica there.
const auto kNumTabletServers = is_3_4_3_mode ? 4 : 3;
const auto kNumTablets = 5;
const auto kNumRows = 100;
ExternalMiniClusterOptions opts;
opts.num_tablet_servers = kNumTabletServers;
opts.num_data_dirs = 5;
opts.block_manager_type = block_manager_type;
opts.extra_master_flags.push_back(
Substitute("--raft_prepare_replacement_before_eviction=$0", is_3_4_3_mode));
opts.extra_tserver_flags.push_back(
Substitute("--raft_prepare_replacement_before_eviction=$0", is_3_4_3_mode));
NO_FATALS(StartClusterWithOpts(opts));
// Write some data to a tablet. This will spread blocks across all
// directories.
TestWorkload write_workload(cluster_.get());
write_workload.set_num_tablets(kNumTablets);
write_workload.Setup();
write_workload.Start();
ASSERT_EVENTUALLY([&] {
ASSERT_GT(kNumRows, write_workload.rows_inserted());
});
write_workload.StopAndJoin();
// Arbitrarily select one tablet server which hosts a replica of the tablet.
ExternalTabletServer* ts = nullptr;
for (const auto& e : ts_map_) {
vector<tserver::ListTabletsResponsePB::StatusAndSchemaPB> tablets;
ASSERT_OK(itest::ListTablets(e.second, kAgreementTimeout, &tablets));
if (!tablets.empty()) {
ts = cluster_->tablet_server_by_uuid(e.first);
break;
}
}
ASSERT_NE(nullptr, ts);
// Ensure at least one tablet get to a running state at one of the tablet servers.
ASSERT_OK(cluster_->WaitForTabletsRunning(ts, 1, kAgreementTimeout));
// Introduce flags to fail one of the directories, avoiding the metadata
// directory, the next time the tablet server starts.
const string& failed_dir = ts->data_dirs()[1];
const vector<string> extra_flags = {
Substitute("--env_inject_eio_globs=$0", JoinPathSegments(failed_dir, "**")),
"--env_inject_eio=1.0",
"--crash_on_eio=false",
};
ts->mutable_flags()->insert(ts->mutable_flags()->begin(), extra_flags.begin(), extra_flags.end());
ts->Shutdown();
// Restart the tablet server with disk failures and ensure it can startup.
ASSERT_OK(ts->Restart());
NO_FATALS(WaitForDiskFailures(ts));
// Ensure that the tablets are successfully evicted and copied.
ClusterVerifier v(cluster_.get());
NO_FATALS(v.CheckCluster());
NO_FATALS(v.CheckRowCount(write_workload.table_name(), ClusterVerifier::AT_LEAST,
write_workload.batches_completed()));
}
INSTANTIATE_TEST_SUITE_P(DiskFailure, DiskFailureITest,
::testing::Combine(
::testing::ValuesIn(BlockManager::block_manager_types()),
::testing::Bool()));
enum class ErrorType {
CFILE_CORRUPTION,
DISK_FAILURE,
KUDU_2233_CORRUPTION,
};
class DiskErrorITestBase : public ExternalMiniClusterITestBase,
public ::testing::WithParamInterface<ErrorType> {
public:
typedef vector<pair<string, string>> FlagList;
static constexpr int kNumDataDirs = 3;
// Set the flags on the given server based on the contents of `flags`.
Status SetFlags(ExternalDaemon* server, const FlagList& flags) const {
for (const auto& flag_pair : flags) {
RETURN_NOT_OK(cluster_->SetFlag(server, flag_pair.first, flag_pair.second));
}
return Status::OK();
}
// Returns the appropriate injection flags for the given error and server.
FlagList InjectionFlags(ErrorType error, ExternalDaemon* error_server) const {
FlagList injection_flags;
switch (error) {
case ErrorType::DISK_FAILURE: {
// Avoid injecting errors to the first data directory.
string data_dirs = Substitute("$0,$1",
JoinPathSegments(error_server->data_dirs()[1], "**"),
JoinPathSegments(error_server->data_dirs()[2], "**"));
injection_flags.emplace_back("env_inject_eio_globs", data_dirs);
injection_flags.emplace_back("env_inject_eio", "1.0");
break;
}
case ErrorType::CFILE_CORRUPTION:
injection_flags.emplace_back("cfile_inject_corruption", "1.0");
break;
case ErrorType::KUDU_2233_CORRUPTION:
injection_flags.emplace_back("tablet_inject_kudu_2233", "1.0");
break;
}
return injection_flags;
}
};
// A generalized test for different kinds of disk errors in tablet servers.
class TabletServerDiskErrorITest : public DiskErrorITestBase {
public:
static constexpr int kNumTablets = 10;
// Set up a cluster with 4 tservers, with `kNumTablets` spread across the
// first three tservers. This ensures that injecting failures into any of the
// first three tservers will hit all tablets.
//
// Also configure the cluster to not delete or copy tablets, even on error.
// This allows us to check all tablets are failed appropriately.
void SetUp() override {
constexpr int kNumRows = 10000;
ExternalMiniClusterOptions opts;
// Use 3 tservers at first; we'll add an empty one later.
opts.num_tablet_servers = 3;
opts.num_data_dirs = kNumDataDirs;
opts.extra_tserver_flags = {
// Flush frequently so we actually get some data blocks.
"--flush_threshold_secs=1",
"--flush_threshold_mb=1",
};
opts.extra_master_flags = {
// Prevent the master from tombstoning replicas that may not be part of
// the config (e.g. if a leader fails, it can be "evicted", despite
// setting `--evict_failed_follower=false`)
"--master_tombstone_evicted_tablet_replicas=false"
};
NO_FATALS(StartClusterWithOpts(std::move(opts)));
// Write some rows to the three servers.
// Uses HASH partitioning to be sure we hit all tablets.
TestWorkload writes(cluster_.get(), TestWorkload::PartitioningType::HASH);
writes.set_num_tablets(kNumTablets);
writes.Setup();
writes.Start();
while (writes.rows_inserted() < kNumRows) {
SleepFor(MonoDelta::FromMilliseconds(10));
}
NO_FATALS(writes.StopAndJoin());
// Now add the last server.
cluster_->AddTabletServer();
for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
// Prevent attempts to copy over replicas, e.g. ones that don't get to a
// running state due to an error.
ASSERT_OK(cluster_->SetFlag(cluster_->tablet_server(i), "enable_tablet_copy", "false"));
}
}
// Set the flags that would allow for the recovery of failed tablets.
Status AllowRecovery() const {
LOG(INFO) << "Resetting error injection flags";
for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
const FlagList recovery_flags = {
// First, stop injecting errors.
{ "env_inject_eio", "0.0" },
{ "cfile_inject_corruption", "0.0" },
{ "tablet_inject_kudu_2233", "0.0" },
// Then allow for recovery.
{ "enable_tablet_copy", "true" },
};
RETURN_NOT_OK(SetFlags(cluster_->tablet_server(i), recovery_flags));
}
return Status::OK();
}
// Waits for the number of failed tablets on the tablet server to reach
// `num_failed`.
void WaitForFailedTablets(ExternalTabletServer* ts, int num_failed,
bool require_all_fail = true) const {
ASSERT_EVENTUALLY([&] {
int64_t failed_on_ts;
ASSERT_OK(itest::GetInt64Metric(ts->bound_http_hostport(),
&METRIC_ENTITY_server, nullptr, &METRIC_tablets_num_failed, "value", &failed_on_ts));
LOG(INFO) << "Currently has " << failed_on_ts << " failed tablets";
ASSERT_TRUE(failed_on_ts == num_failed || (!require_all_fail && failed_on_ts > 0));
});
}
};
INSTANTIATE_TEST_SUITE_P(TabletServerDiskError, TabletServerDiskErrorITest,
::testing::Values(ErrorType::CFILE_CORRUPTION, ErrorType::DISK_FAILURE));
TEST_P(TabletServerDiskErrorITest, TestFailOnBootstrap) {
// Inject the errors into one of the non-empty servers.
ExternalTabletServer* error_ts = cluster_->tablet_server(0);
for (auto flag_pair : InjectionFlags(GetParam(), error_ts)) {
error_ts->mutable_flags()->emplace_back(
Substitute("--$0=$1", flag_pair.first, flag_pair.second));
}
error_ts->Shutdown();
LOG(INFO) << "Restarting server with injected errors...";
ASSERT_OK(error_ts->Restart());
// Wait for all the tablets to reach a failed state.
NO_FATALS(WaitForFailedTablets(error_ts, kNumTablets));
ASSERT_OK(AllowRecovery());
// Wait for the cluster to return to a healthy state.
ClusterVerifier v(cluster_.get());
NO_FATALS(v.CheckCluster());
};
TEST_P(TabletServerDiskErrorITest, TestSpaceAvailableMetrics) {
// Get the wal_dir_space_available_bytes, data_dirs_space_available_bytes and make sure
// they are not -1. We could do ASSERT_EQ of the metrics and the current available
// space but there will most likely be some data written between the two calls causing
// a failure. Hence checking if the values are more than the 0. We fetch the metric
// twice and ensure it is being cached for 10 seconds by default specified by the
// flags --fs_wal_dir_available_space_cache_seconds and
// --fs_data_dirs_available_space_cache_seconds.
const auto get_metrics = [&] (int64_t* wal_dir_space, int64_t* data_dir_space) {
RETURN_NOT_OK(itest::GetInt64Metric(cluster_->tablet_server(0)->bound_http_hostport(),
&METRIC_ENTITY_server, nullptr,
&METRIC_wal_dir_space_available_bytes, "value",
wal_dir_space));
return itest::GetInt64Metric(cluster_->tablet_server(0)->bound_http_hostport(),
&METRIC_ENTITY_server, nullptr,
&METRIC_data_dirs_space_available_bytes, "value",
data_dir_space);
};
int64_t wal_dir_space;
int64_t data_dir_space;
int64_t wal_dir_space_refetch;
int64_t data_dir_space_refetch;
ASSERT_OK(get_metrics(&wal_dir_space, &data_dir_space));
ASSERT_GT(wal_dir_space, 0);
ASSERT_GT(data_dir_space, 0);
ASSERT_OK(get_metrics(&wal_dir_space_refetch, &data_dir_space_refetch));
ASSERT_EQ(wal_dir_space, wal_dir_space_refetch);
ASSERT_EQ(data_dir_space, data_dir_space_refetch);
ExternalTabletServer* error_ts = cluster_->tablet_server(0);
// Inject EIO into one of the data directories and check if data_dirs_space_available_bytes
// now equals to -1
error_ts->mutable_flags()->emplace_back(
Substitute("--env_inject_eio_globs=$0", JoinPathSegments(error_ts->data_dirs()[1],
"**")));
error_ts->mutable_flags()->emplace_back("--env_inject_eio=1.0");
error_ts->Shutdown();
ASSERT_OK(error_ts->Restart());
ASSERT_OK(get_metrics(&wal_dir_space, &data_dir_space));
ASSERT_NE(wal_dir_space, -1);
ASSERT_EQ(data_dir_space, -1);
};
TEST_P(TabletServerDiskErrorITest, TestFailDuringScanWorkload) {
// Make one server to be more likely to host leader replicas: its Raft
// heartbeat interval is much shorter than the others, so it's about to start
// leader elections faster when not hearing from former leaders.
ExternalTabletServer* error_ts = cluster_->tablet_server(0);
error_ts->mutable_flags()->emplace_back("--raft_heartbeat_interval_ms=50");
error_ts->Shutdown();
ASSERT_OK(error_ts->Restart());
// Make other servers to be less likely to host leader replicas: their Raft
// heartbeat interval is much longer than that of 'error_ts', so they detect
// leader failures not so fast as replicas hosted at 'error_ts'.
for (auto i = 1; i < cluster_->num_tablet_servers(); ++i) {
auto* ts = cluster_->tablet_server(i);
ts->mutable_flags()->emplace_back("--raft_heartbeat_interval_ms=250");
ts->Shutdown();
// Let the rest tablet servers to elect new leaders for the tablets that had
// former leaders at the tablet server that hash just been shutdown.
SleepFor(MonoDelta::FromSeconds(1));
ASSERT_OK(ts->Restart());
}
// Inject errors into every IO operation at one of the servers.
ASSERT_OK(SetFlags(error_ts, InjectionFlags(GetParam(), error_ts)));
// Make sure 'ts_error' hosts at least one leader replica for some tablet
// that the read workload is about to fetch data from. The only existing table
// in the cluster is the one created by TabletServerDiskErrorITest::SetUp().
ASSERT_EVENTUALLY([&] {
int64_t num_leaders = 0;
ASSERT_OK(itest::GetInt64Metric(cluster_->tablet_server_by_uuid(
error_ts->uuid())->bound_http_hostport(),
&METRIC_ENTITY_server,
nullptr,
&METRIC_num_raft_leaders,
"value",
&num_leaders));
ASSERT_GT(num_leaders, 0);
});
// Now start a workload that reads only from leader replicas. Some leader
// replicas should be hosted by 'error_ts', and each attempted scan is going
// to fail due to the injected errors. So, at least one tablet replica
// is about to be marked as failed at 'error_ts' as a result.
TestWorkload read(cluster_.get());
read.set_num_write_threads(0);
read.set_num_read_threads(1);
read.set_scanner_selection(KuduClient::ReplicaSelection::LEADER_ONLY);
read.Setup();
read.Start();
// Wait for at least one tablet replica to reach failed state.
NO_FATALS(WaitForFailedTablets(error_ts, kNumTablets, /*require_all_fail*/false));
// Remove all the injected IO failures.
ASSERT_OK(AllowRecovery());
NO_FATALS(read.StopAndJoin());
// Verify the cluster can get to a healthy state.
ClusterVerifier v(cluster_.get());
NO_FATALS(v.CheckCluster());
}
// Test targeting KUDU-2233, though reused for additional coverage of CFile
// checksum failures and disk errors.
class CompactionsAndDeletionsFailureITest : public TabletServerDiskErrorITest {
public:
void SetUp() override {
ExternalMiniClusterOptions opts;
opts.num_tablet_servers = 3;
opts.num_data_dirs = kNumDataDirs;
opts.extra_tserver_flags = {
// Flush frequently so we actually get some data blocks.
"--flush_threshold_secs=1",
"--flush_threshold_mb=1",
// Prevent compactions so we can explicitly enable them later.
"--enable_rowset_compaction=false",
// Prevent tablet copies so we can explicitly enable and monitor them.
"--enable_tablet_copy=false",
};
NO_FATALS(StartClusterWithOpts(std::move(opts)));
workload_.reset(new TestWorkload(cluster_.get()));
workload_->set_num_tablets(1);
workload_->Setup();
}
protected:
unique_ptr<TestWorkload> workload_;
};
TEST_P(CompactionsAndDeletionsFailureITest, TestRecovery) {
constexpr const int kKeyMax = 10;
// Insert and delete rows, and continue to insert until we get multiple
// rowsets.
shared_ptr<KuduSession> session = client_->NewSession();
ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
shared_ptr<KuduTable> table;
ASSERT_OK(client_->OpenTable(workload_->table_name(), &table));
const auto insert_row = [&] (int key) {
std::unique_ptr<KuduInsert> insert(table->NewInsert());
KuduPartialRow* row = insert->mutable_row();
RETURN_NOT_OK(row->SetInt32("key", key));
RETURN_NOT_OK(row->SetInt32("int_val", key));
RETURN_NOT_OK(session->Apply(insert.release()));
return session->Flush();
};
const auto delete_row = [&] (int key) {
std::unique_ptr<KuduDelete> del(table->NewDelete());
KuduPartialRow* row = del->mutable_row();
RETURN_NOT_OK(row->SetInt32("key", key));
RETURN_NOT_OK(session->Apply(del.release()));
return session->Flush();
};
const auto get_num_diskrowsets = [&] (const ExternalTabletServer* ets,
const char* tablet_id) {
int64_t num_drss = 0;
CHECK_OK(itest::GetInt64Metric(ets->bound_http_hostport(),
&METRIC_ENTITY_tablet, tablet_id, &METRIC_num_rowsets_on_disk, "value", &num_drss));
return num_drss;
};
ExternalTabletServer* error_ts = cluster_->tablet_server(0);
itest::TServerDetails* ts = ts_map_[error_ts->uuid()];
vector<tserver::ListTabletsResponsePB::StatusAndSchemaPB> tablets;
ASSERT_OK(itest::WaitForNumTabletsOnTS(ts, 1, MonoDelta::FromSeconds(32), &tablets));
ASSERT_EQ(1, tablets.size());
const string& tablet_id = tablets[0].tablet_status().tablet_id();
int num_inserts = 0;
while (get_num_diskrowsets(error_ts, tablet_id.c_str()) < 2) {
auto key = num_inserts++ % kKeyMax;
ASSERT_OK(insert_row(key));
ASSERT_OK(delete_row(key));
}
// Enable compactions, which will trigger KUDU-2233 codepaths.
auto flag_list = InjectionFlags(GetParam(), error_ts);
flag_list.emplace_back("enable_rowset_compaction", "true");
ASSERT_OK(SetFlags(error_ts, flag_list));
// The tablet replica should fail and be re-replicated back up to full
// health once allowed.
NO_FATALS(WaitForFailedTablets(error_ts, 1));
ASSERT_OK(AllowRecovery());
// Wait for the cluster to return to a healthy state.
ClusterVerifier v(cluster_.get());
NO_FATALS(v.CheckCluster());
}
INSTANTIATE_TEST_SUITE_P(ErrorType, CompactionsAndDeletionsFailureITest,
::testing::Values(ErrorType::CFILE_CORRUPTION, ErrorType::DISK_FAILURE,
ErrorType::KUDU_2233_CORRUPTION));
class MasterDiskErrorITest : public DiskErrorITestBase {
};
INSTANTIATE_TEST_SUITE_P(MasterDiskError, MasterDiskErrorITest,
::testing::Values(ErrorType::CFILE_CORRUPTION, ErrorType::DISK_FAILURE));
// Test that triggers disk error in master during maintenance manager operations like compaction.
TEST_P(MasterDiskErrorITest, TestMasterDiskFailure) {
constexpr int kNumReplicas = 1;
ExternalMiniClusterOptions opts;
opts.num_tablet_servers = kNumReplicas;
opts.num_data_dirs = kNumDataDirs;
opts.extra_master_flags = {
// Flush frequently so we actually get some disk row sets
"--flush_threshold_secs=1",
"--flush_threshold_mb=1",
"--enable_rowset_compaction=false"
};
NO_FATALS(StartClusterWithOpts(std::move(opts)));
// Create bunch of tables to populate the system catalog with overlapping entries
// that'll require compaction of the disk row sets.
std::unique_ptr<client::KuduTableCreator> table_creator(client_->NewTableCreator());
auto client_schema = client::KuduSchema::FromSchema(GetSimpleTestSchema());
for (int table_suffix = 0; table_suffix < 20; table_suffix++) {
string table_name = Substitute("test-$0", table_suffix);
LOG(INFO) << "Creating table " << table_name;
ASSERT_OK(table_creator->table_name(table_name)
.schema(&client_schema)
.set_range_partition_columns({ "key" })
.num_replicas(kNumReplicas)
.wait(true)
.Create());
SleepFor(MonoDelta::FromMilliseconds(100));
}
// Trigger disk failure and enable compaction.
auto leader_master = cluster_->leader_master();
auto flag_list = InjectionFlags(GetParam(), leader_master);
flag_list.emplace_back("enable_rowset_compaction", "true");
ASSERT_OK(SetFlags(leader_master, flag_list));
// Wait for the master to crash
ASSERT_OK(leader_master->WaitForFatal(MonoDelta::FromSeconds(20)));
}
} // namespace kudu