KUDU-3452 Allow creating tablets under replicated tables
Currently, creating a three-replicas table when there are
not less than 3 healthy tablet servers will fail and the system
catalog will retry continuously and always fail until the
unavailable tablet servers becomes healthy again.
An under-replicated table is still available for reading and
writing, so it's enough to place just a majority of replicas
for each tablet at healthy tablet servers to make a newly
created table ready to use.
This patch adds a new flag:
--allow_creating_under_replicated_tables to support this
feature. The original logic is kept the same. When this flag
is set true, it's possible to create a tablet placing just a
majority of replicas at healthy tablet servers. Even if the
new tablet is created under-replicated, it's still available
for read and write operations.
Change-Id: I742ba1ff770f5c8b1be5800334c29bec96e195c6
Reviewed-on: http://gerrit.cloudera.org:8080/19571
Tested-by: Kudu Jenkins
Reviewed-by: Yifan Zhang <chinazhangyifan@163.com>
Reviewed-by: Alexey Serbin <alexey@apache.org>
Reviewed-by: Yuqi Du <shenxingwuying@gmail.com>
Reviewed-by: Yingchun Lai <laiyingchun@apache.org>
diff --git a/src/kudu/integration-tests/create-table-itest.cc b/src/kudu/integration-tests/create-table-itest.cc
index b0969ae..f74af64 100644
--- a/src/kudu/integration-tests/create-table-itest.cc
+++ b/src/kudu/integration-tests/create-table-itest.cc
@@ -33,6 +33,7 @@
#include <glog/logging.h>
#include <gtest/gtest.h>
+#include <rapidjson/document.h>
#include "kudu/client/client.h"
#include "kudu/client/schema.h"
@@ -42,6 +43,7 @@
#include "kudu/common/schema.h"
#include "kudu/common/wire_protocol-test-util.h"
#include "kudu/gutil/mathlimits.h"
+#include "kudu/gutil/strings/split.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/integration-tests/cluster_itest_util.h"
#include "kudu/integration-tests/external_mini_cluster-itest-base.h"
@@ -51,6 +53,7 @@
#include "kudu/mini-cluster/external_mini_cluster.h"
#include "kudu/mini-cluster/mini_cluster.h"
#include "kudu/rpc/rpc_controller.h"
+#include "kudu/tools/tool_test_util.h"
#include "kudu/util/atomic.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
@@ -76,6 +79,7 @@
using client::KuduClient;
using client::KuduSchema;
+using client::sp::shared_ptr;
using cluster::ClusterNodes;
const char* const kTableName = "test-table";
@@ -625,4 +629,152 @@
ASSERT_OK(client_->DeleteTable(kTableName));
}
+class NotEnoughHealthyTServersTest :
+ public CreateTableITest,
+ public ::testing::WithParamInterface<bool> {
+};
+
+INSTANTIATE_TEST_SUITE_P(AddNewTS, NotEnoughHealthyTServersTest, ::testing::Bool());
+
+TEST_P(NotEnoughHealthyTServersTest, TestNotEnoughHealthyTServers) {
+ SKIP_IF_SLOW_NOT_ALLOWED();
+ const auto add_new_ts = GetParam();
+ constexpr const char* const kNotEnoughReplicasTableName = "kudu.not_enough_replicas";
+ constexpr const char* const kOverRegistedTSTableName = "kudu.over.registed.ts";
+ constexpr const char* const kFiveReplicaTableName = "kudu.five.replica";
+ constexpr const char* const kOneReplicaTableName = "kudu.one.replica";
+ constexpr int kNumTabletServers = 5;
+ constexpr int kTSUnresponsiveTimeoutMs = 4000;
+ constexpr int kHeartbeatIntervalMs = 3000;
+
+ // Do not validate the number of replicas.
+ vector<string> master_flags = {
+ "--catalog_manager_check_ts_count_for_create_table=false",
+ "--catalog_manager_check_ts_count_for_alter_table=false",
+ Substitute("--tserver_unresponsive_timeout_ms=$0", kTSUnresponsiveTimeoutMs),
+ Substitute("--heartbeat_interval_ms=$0", kHeartbeatIntervalMs),
+ "--allow_unsafe_replication_factor=true",
+ "--allow_creating_under_replicated_tables=true"
+ };
+ NO_FATALS(StartCluster({}, master_flags, kNumTabletServers));
+
+ string master_address = cluster_->master()->bound_rpc_addr().ToString();
+
+ auto client_schema = KuduSchema::FromSchema(GetSimpleTestSchema());
+ auto create_table_func = [&](const string& table_name, int replica_num) -> Status {
+ unique_ptr<client::KuduTableCreator> table_creator(client_->NewTableCreator());
+ return table_creator->table_name(table_name)
+ .schema(&client_schema)
+ .set_range_partition_columns({ "key" })
+ .num_replicas(replica_num)
+ .Create();
+ };
+
+ // The number of replicas can't be over the number of registered tablet servers.
+ // RF = 6.
+ {
+ Status s = create_table_func(kOverRegistedTSTableName, 6);
+ ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "not enough registered tablet servers to");
+ shared_ptr<client::KuduTable> table;
+ s = client_->OpenTable(kOverRegistedTSTableName, &table);
+ ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+ }
+
+ {
+ // Shutdown 3 tablet servers.
+ for (int i = 0; i < 3; i++) {
+ NO_FATALS(cluster_->tablet_server(i)->Shutdown());
+ }
+ // Wait the 3 tablet servers heartbeat timeout and unresponsive timeout. Then catalog
+ // manager will take them as unavailable tablet servers. KSCK gets the status of tablet
+ // server from tablet serve interface. Here must wait the caltalog manager to take the
+ // as unavailable.
+ SleepFor(MonoDelta::FromMilliseconds(3*(kTSUnresponsiveTimeoutMs + kHeartbeatIntervalMs)));
+ }
+
+ // RF = 5. Creating table will fail because there are not enough live tablet servers.
+ {
+ Status s = create_table_func(kOverRegistedTSTableName, 5);
+ ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "not enough live tablet servers to");
+ }
+
+ {
+ // Restart the first tablet server.
+ NO_FATALS(cluster_->tablet_server(0)->Restart());
+ // Wait the restarted tablet server to send a heartbeat and be registered in catalog manaager.
+ SleepFor(MonoDelta::FromMilliseconds(kHeartbeatIntervalMs));
+ }
+
+ // Create a table with RF=5. It should succeed.
+ ASSERT_OK(create_table_func(kFiveReplicaTableName, 5));
+
+ {
+ // Restart the second tablet server.
+ NO_FATALS(cluster_->tablet_server(1)->Restart());
+ // Wait the restarted tablet server to send a heartbeat and be registered in catalog manaager.
+ SleepFor(MonoDelta::FromMilliseconds(kHeartbeatIntervalMs));
+ }
+
+ // RF = 1.
+ ASSERT_OK(create_table_func(kOneReplicaTableName, 1));
+
+ // Create a five-replicas table.
+ ASSERT_OK(create_table_func(kNotEnoughReplicasTableName, 5));
+
+ // Add another column. Test alter table logic.
+ {
+ shared_ptr<KuduClient> client;
+ ASSERT_OK(cluster_->CreateClient(nullptr, &client));
+ unique_ptr<client::KuduTableAlterer> table_alterer(
+ client_->NewTableAlterer(kNotEnoughReplicasTableName));
+ table_alterer->AddColumn("new_column")->Type(client::KuduColumnSchema::INT32);
+ ASSERT_OK(table_alterer->Alter());
+ }
+
+ {
+ string out;
+ string cmd = Substitute(
+ "cluster ksck $0 --sections=TABLE_SUMMARIES --ksck_format=json_compact", master_address);
+ kudu::tools::RunKuduTool(strings::Split(cmd, " ", strings::SkipEmpty()), &out);
+ rapidjson::Document doc;
+ doc.Parse<0>(out.c_str());
+ ASSERT_EQ(3, doc["table_summaries"].Size());
+ const rapidjson::Value& items = doc["table_summaries"];
+ for (int i = 0; i < items.Size(); i++) {
+ if (string(kOneReplicaTableName) == items[i]["name"].GetString()) {
+ ASSERT_EQ(string("HEALTHY"), items[i]["health"].GetString());
+ } else {
+ ASSERT_EQ(string("UNDER_REPLICATED"), items[i]["health"].GetString());
+ }
+ }
+ }
+
+ if (add_new_ts) {
+ // Add one new tablet server.
+ NO_FATALS(cluster_->AddTabletServer());
+ } else {
+ // Restart the stopped tablet server
+ NO_FATALS(cluster_->tablet_server(2)->Restart());
+ }
+
+ // All tables will become healthy.
+ {
+ ASSERT_EVENTUALLY([&] {
+ string out;
+ string in;
+ string cmd = Substitute(
+ "cluster ksck $0 --sections=TABLE_SUMMARIES --ksck_format=json_compact", master_address);
+ kudu::tools::RunKuduTool(strings::Split(cmd, " ", strings::SkipEmpty()), &out, nullptr, in);
+ rapidjson::Document doc;
+ doc.Parse<0>(out.c_str());
+ ASSERT_EQ(3, doc["table_summaries"].Size());
+ const rapidjson::Value& items = doc["table_summaries"];
+ for (int i = 0; i < items.Size(); i++) {
+ ASSERT_EQ(string("HEALTHY"), items[i]["health"].GetString());
+ }
+ });
+ }
+}
} // namespace kudu
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index d54c083..f50c724 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -414,6 +414,12 @@
TAG_FLAG(require_new_spec_for_custom_hash_schema_range_bound, experimental);
TAG_FLAG(require_new_spec_for_custom_hash_schema_range_bound, runtime);
+DEFINE_bool(allow_creating_under_replicated_tables, false,
+ "Whether to allow creating tablet when there are enough healthy tablet servers "
+ "to place just the majority of tablet replicas");
+TAG_FLAG(allow_creating_under_replicated_tables, experimental);
+TAG_FLAG(allow_creating_under_replicated_tables, hidden);
+
DEFINE_uint32(default_deleted_table_reserve_seconds, 0,
"Time in seconds to be reserved before purging a deleted table for the table "
"from DeleteTable request. Value 0 means DeleteTable() works the regular way, "
@@ -6011,14 +6017,30 @@
TabletInfo* tablet) {
DCHECK(tablet);
TableMetadataLock table_guard(tablet->table().get(), LockMode::READ);
-
if (!table_guard.data().pb.IsInitialized()) {
return Status::InvalidArgument(
Substitute("TableInfo for tablet $0 is not initialized (aborted CreateTable attempt?)",
tablet->id()));
}
- const auto nreplicas = table_guard.data().pb.num_replicas();
+ int nreplicas = table_guard.data().pb.num_replicas();
+ // Try to place the majority of replicas for the tablet when the number of live
+ // tablet servers less than the required number of replicas.
+ if (policy.ts_num() < nreplicas &&
+ FLAGS_allow_creating_under_replicated_tables) {
+ // The Raft protocol requires at least (replication_factor / 2) + 1 live replicas
+ // to form a quorum. Since Kudu places at most one replica of a tablet at one tablet
+ // server, it's necessary to have at least (replication_factor / 2) + 1 live tablet
+ // servers in a cluster to allow the tablet to serve read and write requests.
+ if (policy.ts_num() < consensus::MajoritySize(nreplicas)) {
+ return Status::InvalidArgument(
+ Substitute("need at least $0 out of $1 replicas to form a Raft quorum, "
+ "but only $2 tablet servers are online",
+ consensus::MajoritySize(nreplicas), policy.ts_num()));
+ }
+ nreplicas = policy.ts_num();
+ }
+
if (policy.ts_num() < nreplicas) {
return Status::InvalidArgument(
Substitute("Not enough tablet servers are online for table '$0'. Need at least $1 "
@@ -6669,6 +6691,31 @@
num_ts_needed_for_rereplication, num_live_tservers);
}
+ // Verify that the number of replicas isn't greater than the number of registered
+ // tablet servers in the cluster. This is to make sure the cluster can provide the
+ // expected HA guarantees when all tablet servers are online. Essentially, this
+ // allows to be sure no tablet stays under-replicated for indefinite time when all
+ // the nodes of the cluster are online.
+ TSDescriptorVector registered_ts_descs;
+ master_->ts_manager()->GetAllDescriptors(®istered_ts_descs);
+ if (FLAGS_allow_creating_under_replicated_tables) {
+ if (num_replicas > registered_ts_descs.size()) {
+ return SetupError(Status::InvalidArgument(Substitute(
+ "not enough registered tablet servers to $0 a table with the requested replication "
+ "factor $1; $2 tablet servers are registered",
+ type == ValidateType::kCreateTable ? "create" : "alter",
+ num_replicas, registered_ts_descs.size())),
+ resp, MasterErrorPB::REPLICATION_FACTOR_TOO_HIGH);
+ }
+ if (num_live_tservers < consensus::MajoritySize(num_replicas)) {
+ return SetupError(Status::InvalidArgument(Substitute(
+ "not enough live tablet servers to $0 a table with the requested replication "
+ "factor $1; $2/$3 tablet servers are alive",
+ type == ValidateType::kCreateTable ? "create" : "alter",
+ num_replicas, num_live_tservers, registered_ts_descs.size())),
+ resp, MasterErrorPB::REPLICATION_FACTOR_TOO_HIGH);
+ }
+ }
return Status::OK();
}