KUDU-3452 Allow creating tablets under replicated tables

Currently, creating a three-replicas table when there are
not less than 3 healthy tablet servers will fail and the system
catalog will retry continuously and always fail until the
unavailable tablet servers becomes healthy again.

An under-replicated table is still available for reading and
writing, so it's enough to place just a majority of replicas
for each tablet at healthy tablet servers to make a newly
created table ready to use.

This patch adds a new flag:
--allow_creating_under_replicated_tables to support this
feature. The original logic is kept the same. When this flag
is set true, it's possible to create a tablet placing just a
majority of replicas at healthy tablet servers. Even if the
new tablet is created under-replicated, it's still available
for read and write operations.

Change-Id: I742ba1ff770f5c8b1be5800334c29bec96e195c6
Reviewed-on: http://gerrit.cloudera.org:8080/19571
Tested-by: Kudu Jenkins
Reviewed-by: Yifan Zhang <chinazhangyifan@163.com>
Reviewed-by: Alexey Serbin <alexey@apache.org>
Reviewed-by: Yuqi Du <shenxingwuying@gmail.com>
Reviewed-by: Yingchun Lai <laiyingchun@apache.org>
diff --git a/src/kudu/integration-tests/create-table-itest.cc b/src/kudu/integration-tests/create-table-itest.cc
index b0969ae..f74af64 100644
--- a/src/kudu/integration-tests/create-table-itest.cc
+++ b/src/kudu/integration-tests/create-table-itest.cc
@@ -33,6 +33,7 @@
 
 #include <glog/logging.h>
 #include <gtest/gtest.h>
+#include <rapidjson/document.h>
 
 #include "kudu/client/client.h"
 #include "kudu/client/schema.h"
@@ -42,6 +43,7 @@
 #include "kudu/common/schema.h"
 #include "kudu/common/wire_protocol-test-util.h"
 #include "kudu/gutil/mathlimits.h"
+#include "kudu/gutil/strings/split.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/integration-tests/cluster_itest_util.h"
 #include "kudu/integration-tests/external_mini_cluster-itest-base.h"
@@ -51,6 +53,7 @@
 #include "kudu/mini-cluster/external_mini_cluster.h"
 #include "kudu/mini-cluster/mini_cluster.h"
 #include "kudu/rpc/rpc_controller.h"
+#include "kudu/tools/tool_test_util.h"
 #include "kudu/util/atomic.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
@@ -76,6 +79,7 @@
 
 using client::KuduClient;
 using client::KuduSchema;
+using client::sp::shared_ptr;
 using cluster::ClusterNodes;
 
 const char* const kTableName = "test-table";
@@ -625,4 +629,152 @@
   ASSERT_OK(client_->DeleteTable(kTableName));
 }
 
+class NotEnoughHealthyTServersTest :
+    public CreateTableITest,
+    public ::testing::WithParamInterface<bool> {
+};
+
+INSTANTIATE_TEST_SUITE_P(AddNewTS, NotEnoughHealthyTServersTest, ::testing::Bool());
+
+TEST_P(NotEnoughHealthyTServersTest, TestNotEnoughHealthyTServers) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+  const auto add_new_ts = GetParam();
+  constexpr const char* const kNotEnoughReplicasTableName = "kudu.not_enough_replicas";
+  constexpr const char* const kOverRegistedTSTableName = "kudu.over.registed.ts";
+  constexpr const char* const kFiveReplicaTableName = "kudu.five.replica";
+  constexpr const char* const kOneReplicaTableName = "kudu.one.replica";
+  constexpr int kNumTabletServers = 5;
+  constexpr int kTSUnresponsiveTimeoutMs = 4000;
+  constexpr int kHeartbeatIntervalMs = 3000;
+
+  // Do not validate the number of replicas.
+  vector<string> master_flags = {
+      "--catalog_manager_check_ts_count_for_create_table=false",
+      "--catalog_manager_check_ts_count_for_alter_table=false",
+      Substitute("--tserver_unresponsive_timeout_ms=$0", kTSUnresponsiveTimeoutMs),
+      Substitute("--heartbeat_interval_ms=$0", kHeartbeatIntervalMs),
+      "--allow_unsafe_replication_factor=true",
+      "--allow_creating_under_replicated_tables=true"
+  };
+  NO_FATALS(StartCluster({}, master_flags, kNumTabletServers));
+
+  string master_address = cluster_->master()->bound_rpc_addr().ToString();
+
+  auto client_schema = KuduSchema::FromSchema(GetSimpleTestSchema());
+  auto create_table_func = [&](const string& table_name, int replica_num) -> Status {
+    unique_ptr<client::KuduTableCreator> table_creator(client_->NewTableCreator());
+    return table_creator->table_name(table_name)
+                  .schema(&client_schema)
+                  .set_range_partition_columns({ "key" })
+                  .num_replicas(replica_num)
+                  .Create();
+  };
+
+  // The number of replicas can't be over the number of registered tablet servers.
+  // RF = 6.
+  {
+    Status s = create_table_func(kOverRegistedTSTableName, 6);
+    ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), "not enough registered tablet servers to");
+    shared_ptr<client::KuduTable> table;
+    s = client_->OpenTable(kOverRegistedTSTableName, &table);
+    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+  }
+
+  {
+    // Shutdown 3 tablet servers.
+    for (int i = 0; i < 3; i++) {
+      NO_FATALS(cluster_->tablet_server(i)->Shutdown());
+    }
+    // Wait the 3 tablet servers heartbeat timeout and unresponsive timeout. Then catalog
+    // manager will take them as unavailable tablet servers. KSCK gets the status of tablet
+    // server from tablet serve interface. Here must wait the caltalog manager to take the
+    // as unavailable.
+    SleepFor(MonoDelta::FromMilliseconds(3*(kTSUnresponsiveTimeoutMs + kHeartbeatIntervalMs)));
+  }
+
+  // RF = 5. Creating table will fail because there are not enough live tablet servers.
+  {
+    Status s = create_table_func(kOverRegistedTSTableName, 5);
+    ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), "not enough live tablet servers to");
+  }
+
+  {
+    // Restart the first tablet server.
+    NO_FATALS(cluster_->tablet_server(0)->Restart());
+    // Wait the restarted tablet server to send a heartbeat and be registered in catalog manaager.
+    SleepFor(MonoDelta::FromMilliseconds(kHeartbeatIntervalMs));
+  }
+
+  // Create a table with RF=5. It should succeed.
+  ASSERT_OK(create_table_func(kFiveReplicaTableName, 5));
+
+  {
+    // Restart the second tablet server.
+    NO_FATALS(cluster_->tablet_server(1)->Restart());
+    // Wait the restarted tablet server to send a heartbeat and be registered in catalog manaager.
+    SleepFor(MonoDelta::FromMilliseconds(kHeartbeatIntervalMs));
+  }
+
+  // RF = 1.
+  ASSERT_OK(create_table_func(kOneReplicaTableName, 1));
+
+  // Create a five-replicas table.
+  ASSERT_OK(create_table_func(kNotEnoughReplicasTableName, 5));
+
+  // Add another column. Test alter table logic.
+  {
+    shared_ptr<KuduClient> client;
+    ASSERT_OK(cluster_->CreateClient(nullptr, &client));
+    unique_ptr<client::KuduTableAlterer> table_alterer(
+        client_->NewTableAlterer(kNotEnoughReplicasTableName));
+    table_alterer->AddColumn("new_column")->Type(client::KuduColumnSchema::INT32);
+    ASSERT_OK(table_alterer->Alter());
+  }
+
+  {
+    string out;
+    string cmd = Substitute(
+      "cluster ksck $0 --sections=TABLE_SUMMARIES --ksck_format=json_compact", master_address);
+    kudu::tools::RunKuduTool(strings::Split(cmd, " ", strings::SkipEmpty()), &out);
+    rapidjson::Document doc;
+    doc.Parse<0>(out.c_str());
+    ASSERT_EQ(3, doc["table_summaries"].Size());
+    const rapidjson::Value& items = doc["table_summaries"];
+    for (int i = 0; i < items.Size(); i++) {
+      if (string(kOneReplicaTableName) == items[i]["name"].GetString()) {
+        ASSERT_EQ(string("HEALTHY"), items[i]["health"].GetString());
+      } else {
+        ASSERT_EQ(string("UNDER_REPLICATED"), items[i]["health"].GetString());
+      }
+    }
+  }
+
+  if (add_new_ts) {
+    // Add one new tablet server.
+    NO_FATALS(cluster_->AddTabletServer());
+  } else {
+    // Restart the stopped tablet server
+    NO_FATALS(cluster_->tablet_server(2)->Restart());
+  }
+
+  // All tables will become healthy.
+  {
+    ASSERT_EVENTUALLY([&] {
+      string out;
+      string in;
+      string cmd = Substitute(
+      "cluster ksck $0 --sections=TABLE_SUMMARIES --ksck_format=json_compact", master_address);
+      kudu::tools::RunKuduTool(strings::Split(cmd, " ", strings::SkipEmpty()), &out, nullptr, in);
+      rapidjson::Document doc;
+      doc.Parse<0>(out.c_str());
+      ASSERT_EQ(3, doc["table_summaries"].Size());
+      const rapidjson::Value& items = doc["table_summaries"];
+      for (int i = 0; i < items.Size(); i++) {
+        ASSERT_EQ(string("HEALTHY"), items[i]["health"].GetString());
+      }
+    });
+  }
+}
 } // namespace kudu
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index d54c083..f50c724 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -414,6 +414,12 @@
 TAG_FLAG(require_new_spec_for_custom_hash_schema_range_bound, experimental);
 TAG_FLAG(require_new_spec_for_custom_hash_schema_range_bound, runtime);
 
+DEFINE_bool(allow_creating_under_replicated_tables, false,
+            "Whether to allow creating tablet when there are enough healthy tablet servers "
+            "to place just the majority of tablet replicas");
+TAG_FLAG(allow_creating_under_replicated_tables, experimental);
+TAG_FLAG(allow_creating_under_replicated_tables, hidden);
+
 DEFINE_uint32(default_deleted_table_reserve_seconds, 0,
               "Time in seconds to be reserved before purging a deleted table for the table "
               "from DeleteTable request. Value 0 means DeleteTable() works the regular way, "
@@ -6011,14 +6017,30 @@
                                                TabletInfo* tablet) {
   DCHECK(tablet);
   TableMetadataLock table_guard(tablet->table().get(), LockMode::READ);
-
   if (!table_guard.data().pb.IsInitialized()) {
     return Status::InvalidArgument(
         Substitute("TableInfo for tablet $0 is not initialized (aborted CreateTable attempt?)",
                    tablet->id()));
   }
 
-  const auto nreplicas = table_guard.data().pb.num_replicas();
+  int nreplicas = table_guard.data().pb.num_replicas();
+  // Try to place the majority of replicas for the tablet when the number of live
+  // tablet servers less than the required number of replicas.
+  if (policy.ts_num() < nreplicas &&
+      FLAGS_allow_creating_under_replicated_tables) {
+    // The Raft protocol requires at least (replication_factor / 2) + 1 live replicas
+    // to form a quorum.  Since Kudu places at most one replica of a tablet at one tablet
+    // server, it's necessary to have at least (replication_factor / 2) + 1 live tablet
+    // servers in a cluster to allow the tablet to serve read and write requests.
+    if (policy.ts_num() < consensus::MajoritySize(nreplicas)) {
+      return Status::InvalidArgument(
+          Substitute("need at least $0 out of $1 replicas to form a Raft quorum, "
+                     "but only $2 tablet servers are online",
+                     consensus::MajoritySize(nreplicas), policy.ts_num()));
+    }
+    nreplicas = policy.ts_num();
+  }
+
   if (policy.ts_num() < nreplicas) {
     return Status::InvalidArgument(
         Substitute("Not enough tablet servers are online for table '$0'. Need at least $1 "
@@ -6669,6 +6691,31 @@
         num_ts_needed_for_rereplication, num_live_tservers);
   }
 
+  // Verify that the number of replicas isn't greater than the number of registered
+  // tablet servers in the cluster. This is to make sure the cluster can provide the
+  // expected HA guarantees when all tablet servers are online.  Essentially, this
+  // allows to be sure no tablet stays under-replicated for indefinite time when all
+  // the nodes of the cluster are online.
+  TSDescriptorVector registered_ts_descs;
+  master_->ts_manager()->GetAllDescriptors(&registered_ts_descs);
+  if (FLAGS_allow_creating_under_replicated_tables) {
+    if (num_replicas > registered_ts_descs.size()) {
+      return SetupError(Status::InvalidArgument(Substitute(
+          "not enough registered tablet servers to $0 a table with the requested replication "
+          "factor $1; $2 tablet servers are registered",
+          type == ValidateType::kCreateTable ? "create" : "alter",
+          num_replicas, registered_ts_descs.size())),
+                        resp, MasterErrorPB::REPLICATION_FACTOR_TOO_HIGH);
+    }
+    if (num_live_tservers < consensus::MajoritySize(num_replicas)) {
+      return SetupError(Status::InvalidArgument(Substitute(
+          "not enough live tablet servers to $0 a table with the requested replication "
+          "factor $1; $2/$3 tablet servers are alive",
+          type == ValidateType::kCreateTable ? "create" : "alter",
+          num_replicas, num_live_tservers, registered_ts_descs.size())),
+                        resp, MasterErrorPB::REPLICATION_FACTOR_TOO_HIGH);
+    }
+  }
   return Status::OK();
 }