blob: fd8750ae597f646429bb6f762533ba69345a63fc [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <algorithm>
#include <cstdint>
#include <cstdio>
#include <deque>
#include <functional>
#include <iterator>
#include <limits>
#include <memory>
#include <ostream>
#include <string>
#include <tuple>
#include <type_traits>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
#include <gflags/gflags_declare.h>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "kudu/client/client-test-util.h"
#include "kudu/client/client.h"
#include "kudu/client/schema.h"
#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
#include "kudu/client/value.h"
#include "kudu/client/write_op.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/partial_row.h"
#include "kudu/common/partition.h"
#include "kudu/common/schema.h"
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/metadata.pb.h"
#include "kudu/consensus/opid.pb.h"
#include "kudu/consensus/quorum_util.h"
#include "kudu/consensus/raft_consensus.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/basictypes.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/split.h"
#include "kudu/gutil/strings/strip.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/integration-tests/cluster_itest_util.h"
#include "kudu/integration-tests/cluster_verifier.h"
#include "kudu/integration-tests/test_workload.h"
#include "kudu/integration-tests/ts_itest-base.h"
#include "kudu/master/catalog_manager.h"
#include "kudu/master/master.h"
#include "kudu/master/master.pb.h"
#include "kudu/master/master_options.h"
#include "kudu/master/sys_catalog.h"
#include "kudu/mini-cluster/external_mini_cluster.h"
#include "kudu/tablet/metadata.pb.h"
#include "kudu/tablet/tablet_replica.h"
#include "kudu/tools/tool_test_util.h"
#include "kudu/tserver/tablet_server-test-base.h"
#include "kudu/util/cow_object.h"
#include "kudu/util/env.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
DECLARE_int32(num_replicas);
DECLARE_int32(num_tablet_servers);
DECLARE_string(sasl_protocol_name);
using kudu::client::KuduClient;
using kudu::client::KuduClientBuilder;
using kudu::client::KuduColumnSchema;
using kudu::client::KuduColumnStorageAttributes;
using kudu::client::KuduInsert;
using kudu::client::KuduSchema;
using kudu::client::KuduSchemaBuilder;
using kudu::client::KuduTable;
using kudu::client::KuduTableAlterer;
using kudu::client::KuduTableCreator;
using kudu::client::KuduValue;
using kudu::client::sp::shared_ptr;
using kudu::cluster::ExternalMiniCluster;
using kudu::cluster::ExternalTabletServer;
using kudu::cluster::ExternalMiniCluster;
using kudu::cluster::ExternalMiniClusterOptions;
using kudu::consensus::COMMITTED_OPID;
using kudu::consensus::ConsensusStatePB;
using kudu::consensus::EXCLUDE_HEALTH_REPORT;
using kudu::consensus::LeaderStepDownMode;
using kudu::consensus::OpId;
using kudu::itest::FindTabletFollowers;
using kudu::itest::FindTabletLeader;
using kudu::itest::GetConsensusState;
using kudu::itest::ListTablesWithInfo;
using kudu::itest::StartElection;
using kudu::itest::WaitUntilLeader;
using kudu::itest::TabletServerMap;
using kudu::itest::TServerDetails;
using kudu::itest::WAIT_FOR_LEADER;
using kudu::itest::WaitForReplicasReportedToMaster;
using kudu::itest::WaitForServersToAgree;
using kudu::itest::WaitUntilCommittedConfigNumVotersIs;
using kudu::itest::WaitUntilTabletInState;
using kudu::itest::WaitUntilTabletRunning;
using kudu::master::Master;
using kudu::master::MasterOptions;
using kudu::master::SysCatalogTable;
using kudu::master::TableInfo;
using kudu::master::TableInfoLoader;
using kudu::master::TableMetadataLock;
using kudu::master::TabletInfo;
using kudu::master::TabletInfoLoader;
using kudu::master::TabletMetadataGroupLock;
using kudu::master::VOTER_REPLICA;
using kudu::pb_util::SecureDebugString;
using std::back_inserter;
using std::copy;
using std::deque;
using std::endl;
using std::ostringstream;
using std::string;
using std::pair;
using std::tuple;
using std::unique_ptr;
using std::vector;
using strings::Split;
using strings::Substitute;
namespace kudu {
namespace tools {
namespace {
Status NoOpCb() {
return Status::OK();
}
} // anonymous namespace
// Helper to format info when a tool action fails.
static string ToolRunInfo(const Status& s, const string& out, const string& err) {
ostringstream str;
str << s.ToString() << endl;
str << "stdout: " << out << endl;
str << "stderr: " << err << endl;
return str.str();
}
// Helper macro for tool tests. Use as follows:
//
// ASSERT_TOOL_OK("cluster", "ksck", master_addrs);
//
// The failure Status result of RunKuduTool is usually useless, so this macro
// also logs the stdout and stderr in case of failure, for easier diagnosis.
// TODO(wdberkeley): Add a macro to retrieve stdout or stderr, or a macro for
// when one of those should match a string.
#define ASSERT_TOOL_OK(...) do { \
const vector<string>& _args{__VA_ARGS__}; \
string _out, _err; \
const Status& _s = RunKuduTool(_args, &_out, &_err); \
if (_s.ok()) { \
SUCCEED(); \
} else { \
FAIL() << ToolRunInfo(_s, _out, _err); \
} \
} while (0)
class AdminCliTest : public tserver::TabletServerIntegrationTestBase {
protected:
// Find a remaining node which will be picked for re-replication.
const TServerDetails*
FindNodeForReReplication(const TabletServerMap& active_tablet_servers) const {
vector<TServerDetails*> all_tservers;
AppendValuesFromMap(tablet_servers_, &all_tservers);
for (const auto *ts : all_tservers) {
if (!ContainsKey(active_tablet_servers, ts->uuid())) {
return ts;
}
}
return nullptr;
}
};
TEST_F(AdminCliTest, TestFindTabletFollowers) {
FLAGS_num_tablet_servers = 5;
FLAGS_num_replicas = 3;
NO_FATALS(BuildAndStart());
TServerDetails* leader;
const auto timeout = MonoDelta::FromSeconds(30);
ASSERT_OK(FindTabletLeader(tablet_servers_, tablet_id_, timeout, &leader));
vector<TServerDetails*> followers;
ASSERT_EVENTUALLY([&] {
ASSERT_OK(FindTabletFollowers(tablet_servers_, tablet_id_, timeout, &followers));
});
ASSERT_EQ(FLAGS_num_replicas - 1, followers.size());
for (const auto& follower : followers) {
ASSERT_NE(leader->uuid(), follower->uuid());
}
}
TEST_F(AdminCliTest, TestFindTabletFollowersWithIncompleteTabletServers) {
FLAGS_num_tablet_servers = 5;
FLAGS_num_replicas = 3;
NO_FATALS(BuildAndStart());
TServerDetails* leader;
const auto timeout = MonoDelta::FromSeconds(30);
ASSERT_OK(FindTabletLeader(tablet_servers_, tablet_id_, timeout, &leader));
// Incomplete tablet servers that contains only the leader tablet server.
TabletServerMap incomplete_tablet_servers;
InsertOrDie(&incomplete_tablet_servers, leader->uuid(), leader);
vector<TServerDetails*> followers;
ASSERT_EVENTUALLY([&] {
Status s = FindTabletFollowers(incomplete_tablet_servers, tablet_id_, timeout, &followers);
ASSERT_TRUE(s.IsInvalidArgument());
ASSERT_STR_CONTAINS(
s.ToString(),
"Not all peers reported by tablet servers are part of the supplied tablet servers");
});
}
// Test config change while running a workload.
// 1. Instantiate external mini cluster with 3 TS.
// 2. Create table with 2 replicas.
// 3. Invoke CLI to trigger a config change.
// 4. Wait until the new server bootstraps.
// 5. Profit!
TEST_F(AdminCliTest, TestChangeConfig) {
const vector<string> kMasterFlags = {
"--catalog_manager_wait_for_new_tablets_to_elect_leader=false",
"--allow_unsafe_replication_factor=true",
// If running with the 3-4-3 replication scheme, the catalog manager removes
// excess replicas, so it's necessary to disable that default behavior
// since this test manages replicas on its own.
"--catalog_manager_evict_excess_replicas=false",
};
const vector<string> kTserverFlags = {
"--enable_leader_failure_detection=false",
};
FLAGS_num_tablet_servers = 3;
FLAGS_num_replicas = 2;
NO_FATALS(BuildAndStart(kTserverFlags, kMasterFlags));
vector<TServerDetails*> tservers;
AppendValuesFromMap(tablet_servers_, &tservers);
ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size());
TabletServerMap active_tablet_servers;
TabletServerMap::const_iterator iter = tablet_replicas_.find(tablet_id_);
TServerDetails* leader = iter->second;
TServerDetails* follower = (++iter)->second;
InsertOrDie(&active_tablet_servers, leader->uuid(), leader);
InsertOrDie(&active_tablet_servers, follower->uuid(), follower);
TServerDetails* new_node = nullptr;
for (TServerDetails* ts : tservers) {
if (!ContainsKey(active_tablet_servers, ts->uuid())) {
new_node = ts;
break;
}
}
ASSERT_NE(nullptr, new_node);
// Elect the leader (still only a consensus config size of 2).
ASSERT_OK(StartElection(leader, tablet_id_, MonoDelta::FromSeconds(10)));
ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(30), active_tablet_servers,
tablet_id_, 1));
TestWorkload workload(cluster_.get());
workload.set_table_name(kTableId);
workload.set_timeout_allowed(true);
workload.set_write_timeout_millis(10000);
workload.set_num_replicas(FLAGS_num_replicas);
workload.set_num_write_threads(1);
workload.set_write_batch_size(1);
workload.Setup();
workload.Start();
// Wait until the Master knows about the leader tserver.
TServerDetails* master_observed_leader;
ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &master_observed_leader));
ASSERT_EQ(leader->uuid(), master_observed_leader->uuid());
LOG(INFO) << "Adding replica at tserver with UUID "
<< new_node->uuid() << " as VOTER...";
ASSERT_TOOL_OK(
"tablet",
"change_config",
"add_replica",
cluster_->master()->bound_rpc_addr().ToString(),
tablet_id_,
new_node->uuid(),
"VOTER"
);
InsertOrDie(&active_tablet_servers, new_node->uuid(), new_node);
ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(active_tablet_servers.size(),
leader, tablet_id_,
MonoDelta::FromSeconds(10)));
workload.StopAndJoin();
int num_batches = workload.batches_completed();
LOG(INFO) << "Waiting for replicas to agree...";
// Wait for all servers to replicate everything up through the last write op.
// Since we don't batch, there should be at least # rows inserted log entries,
// plus the initial leader's no-op, plus 1 for
// the added replica for a total == #rows + 2.
int min_log_index = num_batches + 2;
ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(30),
active_tablet_servers, tablet_id_,
min_log_index));
int rows_inserted = workload.rows_inserted();
LOG(INFO) << "Number of rows inserted: " << rows_inserted;
ClusterVerifier v(cluster_.get());
NO_FATALS(v.CheckCluster());
NO_FATALS(v.CheckRowCount(kTableId, ClusterVerifier::AT_LEAST, rows_inserted));
// Now remove the server.
LOG(INFO) << "Removing replica at tserver with UUID "
<< new_node->uuid() << " from the config...";
ASSERT_TOOL_OK(
"tablet",
"change_config",
"remove_replica",
cluster_->master()->bound_rpc_addr().ToString(),
tablet_id_,
new_node->uuid()
);
ASSERT_EQ(1, active_tablet_servers.erase(new_node->uuid()));
ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(active_tablet_servers.size(),
leader, tablet_id_,
MonoDelta::FromSeconds(10)));
}
enum class DownTS {
None,
TabletPeer,
// Regression case for KUDU-2331.
UninvolvedTS,
};
class MoveTabletParamTest :
public AdminCliTest,
public ::testing::WithParamInterface<tuple<Kudu1097, DownTS>> {
};
TEST_P(MoveTabletParamTest, Test) {
const MonoDelta timeout = MonoDelta::FromSeconds(30);
const auto& param = GetParam();
const auto enable_kudu_1097 = std::get<0>(param);
const auto downTS = std::get<1>(param);
FLAGS_num_tablet_servers = 5;
FLAGS_num_replicas = 3;
vector<string> ts_flags, master_flags;
ts_flags = master_flags = {
Substitute("--raft_prepare_replacement_before_eviction=$0",
enable_kudu_1097 == Kudu1097::Enable) };
NO_FATALS(BuildAndStart(ts_flags, master_flags));
vector<string> tservers;
AppendKeysFromMap(tablet_servers_, &tservers);
ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size());
deque<string> active_tservers;
for (auto iter = tablet_replicas_.find(tablet_id_); iter != tablet_replicas_.cend(); ++iter) {
active_tservers.push_back(iter->second->uuid());
}
ASSERT_EQ(FLAGS_num_replicas, active_tservers.size());
deque<string> inactive_tservers;
std::sort(tservers.begin(), tservers.end());
std::sort(active_tservers.begin(), active_tservers.end());
std::set_difference(tservers.cbegin(), tservers.cend(),
active_tservers.cbegin(), active_tservers.cend(),
std::back_inserter(inactive_tservers));
ASSERT_EQ(FLAGS_num_tablet_servers - FLAGS_num_replicas, inactive_tservers.size());
// The workload is light (1 thread, 1 op batches) so that new replicas
// bootstrap and converge quickly.
TestWorkload workload(cluster_.get());
workload.set_table_name(kTableId);
workload.set_num_replicas(FLAGS_num_replicas);
workload.set_num_write_threads(1);
workload.set_write_batch_size(1);
workload.set_write_timeout_millis(timeout.ToMilliseconds());
workload.Setup();
workload.Start();
if (downTS == DownTS::TabletPeer) {
// To test that the move fails if any peer is down, when downTS is
// 'TabletPeer', shut down a server besides 'add' that hosts a replica.
NO_FATALS(cluster_->tablet_server_by_uuid(active_tservers.back())->Shutdown());
} else if (downTS == DownTS::UninvolvedTS) {
// Regression case for KUDU-2331, where move_replica would fail if any tablet
// server is down, even if that tablet server was not involved in the move.
NO_FATALS(cluster_->tablet_server_by_uuid(inactive_tservers.back())->Shutdown());
}
// If we're not bringing down a tablet server, do 3 moves.
// Assuming no ad hoc leadership changes, 3 guarantees the leader is moved at least once.
int num_moves = AllowSlowTests() && (downTS == DownTS::None) ? 3 : 1;
for (int i = 0; i < num_moves; i++) {
const string remove = active_tservers.front();
const string add = inactive_tservers.front();
vector<string> tool_command = {
"tablet",
"change_config",
"move_replica",
cluster_->master()->bound_rpc_addr().ToString(),
tablet_id_,
remove,
add,
};
string stdout, stderr;
Status s = RunKuduTool(tool_command, &stdout, &stderr);
if (downTS == DownTS::TabletPeer) {
ASSERT_TRUE(s.IsRuntimeError());
workload.StopAndJoin();
return;
}
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
active_tservers.pop_front();
active_tservers.push_back(add);
inactive_tservers.pop_front();
inactive_tservers.push_back(remove);
// Allow the added server time to catch up so it applies the newest configuration.
// If we don't wait, the initial ksck of move_tablet can fail with consensus conflict.
TabletServerMap active_tservers_map;
for (const string& uuid : active_tservers) {
InsertOrDie(&active_tservers_map, uuid, tablet_servers_[uuid]);
}
ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(/*num_voters=*/ FLAGS_num_replicas,
active_tservers_map[add],
tablet_id_, timeout));
NO_FATALS(WaitUntilCommittedConfigNumMembersIs(/*num_members=*/ FLAGS_num_replicas,
active_tservers_map[add],
tablet_id_, timeout));
}
workload.StopAndJoin();
NO_FATALS(cluster_->AssertNoCrashes());
// If a tablet server is down, we need to skip the ClusterVerifier.
if (downTS == DownTS::None) {
ClusterVerifier v(cluster_.get());
NO_FATALS(v.CheckCluster());
}
}
INSTANTIATE_TEST_SUITE_P(EnableKudu1097AndDownTS, MoveTabletParamTest,
::testing::Combine(::testing::Values(Kudu1097::Disable,
Kudu1097::Enable),
::testing::Values(DownTS::None,
DownTS::TabletPeer,
DownTS::UninvolvedTS)));
Status RunUnsafeChangeConfig(const string& tablet_id,
const string& dst_host,
const vector<string>& peer_uuid_list) {
vector<string> command_args = {
"remote_replica",
"unsafe_change_config",
dst_host,
tablet_id
};
copy(peer_uuid_list.begin(), peer_uuid_list.end(), back_inserter(command_args));
return RunKuduTool(command_args);
}
// Test unsafe config change when there is one follower survivor in the cluster.
// 1. Instantiate external mini cluster with 1 tablet having 3 replicas and 5 TS.
// 2. Shut down leader and follower1.
// 3. Trigger unsafe config change on follower2 having follower2 in the config.
// 4. Wait until the new config is populated on follower2(new leader) and master.
// 5. Bring up leader and follower1 and verify replicas are deleted.
// 6. Verify that new config doesn't contain old leader and follower1.
TEST_F(AdminCliTest, TestUnsafeChangeConfigOnSingleFollower) {
const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
FLAGS_num_tablet_servers = 5;
FLAGS_num_replicas = 3;
// tserver_unresponsive_timeout_ms is useful so that master considers
// the live tservers for tablet re-replication.
NO_FATALS(BuildAndStart());
LOG(INFO) << "Finding tablet leader and waiting for things to start...";
string tablet_id = tablet_replicas_.begin()->first;
// Determine the list of tablet servers currently in the config.
TabletServerMap active_tablet_servers;
auto iter = tablet_replicas_.equal_range(tablet_id);
for (auto it = iter.first; it != iter.second; ++it) {
InsertOrDie(&active_tablet_servers, it->second->uuid(), it->second);
}
// Get a baseline config reported to the master.
LOG(INFO) << "Waiting for Master to see the current replicas...";
master::GetTabletLocationsResponsePB tablet_locations;
bool has_leader;
ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
3, tablet_id, kTimeout,
WAIT_FOR_LEADER,
VOTER_REPLICA,
&has_leader, &tablet_locations));
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations);
TServerDetails* leader_ts;
ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id, kTimeout, &leader_ts));
vector<TServerDetails*> followers;
ASSERT_EVENTUALLY([&] {
ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id, kTimeout, &followers));
});
// Wait for initial NO_OP to be committed by the leader.
ASSERT_OK(WaitForOpFromCurrentTerm(leader_ts, tablet_id, COMMITTED_OPID, kTimeout));
// Shut down master so it doesn't interfere while we shut down the leader and
// one of the other followers.
cluster_->master()->Shutdown();
cluster_->tablet_server_by_uuid(leader_ts->uuid())->Shutdown();
cluster_->tablet_server_by_uuid(followers[1]->uuid())->Shutdown();
LOG(INFO) << "Forcing unsafe config change on remaining follower " << followers[0]->uuid();
const string& follower0_addr =
cluster_->tablet_server_by_uuid(followers[0]->uuid())->bound_rpc_addr().ToString();
ASSERT_OK(RunUnsafeChangeConfig(tablet_id, follower0_addr, { followers[0]->uuid() }));
ASSERT_OK(WaitUntilLeader(followers[0], tablet_id, kTimeout));
ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(1, followers[0], tablet_id, kTimeout));
LOG(INFO) << "Restarting master...";
// Restart master so it can re-replicate the tablet to remaining tablet servers.
ASSERT_OK(cluster_->master()->Restart());
// Wait for master to re-replicate.
ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(3, followers[0], tablet_id, kTimeout));
ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
3, tablet_id, kTimeout,
WAIT_FOR_LEADER, VOTER_REPLICA,
&has_leader, &tablet_locations));
OpId opid;
ASSERT_OK(WaitForOpFromCurrentTerm(followers[0], tablet_id, COMMITTED_OPID, kTimeout, &opid));
active_tablet_servers.clear();
std::unordered_set<string> replica_uuids;
for (const auto& loc : tablet_locations.tablet_locations(0).interned_replicas()) {
const auto& uuid = tablet_locations.ts_infos(loc.ts_info_idx()).permanent_uuid();
InsertOrDie(&active_tablet_servers, uuid, tablet_servers_[uuid]);
}
ASSERT_OK(WaitForServersToAgree(kTimeout, active_tablet_servers, tablet_id, opid.index()));
// Verify that two new servers are part of new config and old
// servers are gone.
for (const auto& replica : tablet_locations.tablet_locations(0).interned_replicas()) {
const auto& uuid = tablet_locations.ts_infos(replica.ts_info_idx()).permanent_uuid();
ASSERT_NE(uuid, followers[1]->uuid());
ASSERT_NE(uuid, leader_ts->uuid());
}
// Also verify that when we bring back followers[1] and leader,
// we should see the tablet in TOMBSTONED state on these servers.
ASSERT_OK(cluster_->tablet_server_by_uuid(leader_ts->uuid())->Restart());
ASSERT_OK(cluster_->tablet_server_by_uuid(followers[1]->uuid())->Restart());
ASSERT_OK(WaitUntilTabletInState(leader_ts, tablet_id, tablet::STOPPED, kTimeout));
ASSERT_OK(WaitUntilTabletInState(followers[1], tablet_id, tablet::STOPPED, kTimeout));
}
// Test unsafe config change when there is one leader survivor in the cluster.
// 1. Instantiate external mini cluster with 1 tablet having 3 replicas and 5 TS.
// 2. Shut down both followers.
// 3. Trigger unsafe config change on leader having leader in the config.
// 4. Wait until the new config is populated on leader and master.
// 5. Verify that new config does not contain old followers.
TEST_F(AdminCliTest, TestUnsafeChangeConfigOnSingleLeader) {
const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
FLAGS_num_tablet_servers = 5;
FLAGS_num_replicas = 3;
NO_FATALS(BuildAndStart());
// Determine the list of tablet servers currently in the config.
TabletServerMap active_tablet_servers;
auto iter = tablet_replicas_.equal_range(tablet_id_);
for (auto it = iter.first; it != iter.second; ++it) {
InsertOrDie(&active_tablet_servers, it->second->uuid(), it->second);
}
// Get a baseline config reported to the master.
LOG(INFO) << "Waiting for Master to see the current replicas...";
master::GetTabletLocationsResponsePB tablet_locations;
bool has_leader;
ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
3, tablet_id_, kTimeout,
WAIT_FOR_LEADER, VOTER_REPLICA,
&has_leader, &tablet_locations));
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations);
TServerDetails* leader_ts;
ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts));
vector<TServerDetails*> followers;
ASSERT_EVENTUALLY([&] {
ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers));
});
// Wait for initial NO_OP to be committed by the leader.
ASSERT_OK(WaitForOpFromCurrentTerm(leader_ts, tablet_id_, COMMITTED_OPID, kTimeout));
// Shut down servers follower1 and follower2,
// so that we can force new config on remaining leader.
cluster_->tablet_server_by_uuid(followers[0]->uuid())->Shutdown();
cluster_->tablet_server_by_uuid(followers[1]->uuid())->Shutdown();
// Restart master to cleanup cache of dead servers from its list of candidate
// servers to trigger placement of new replicas on healthy servers.
cluster_->master()->Shutdown();
ASSERT_OK(cluster_->master()->Restart());
LOG(INFO) << "Forcing unsafe config change on tserver " << leader_ts->uuid();
const string& leader_addr = Substitute("$0:$1",
leader_ts->registration.rpc_addresses(0).host(),
leader_ts->registration.rpc_addresses(0).port());
const vector<string> peer_uuid_list = { leader_ts->uuid() };
ASSERT_OK(RunUnsafeChangeConfig(tablet_id_, leader_addr, peer_uuid_list));
// Check that new config is populated to a new follower.
vector<TServerDetails*> all_tservers;
TServerDetails *new_follower = nullptr;
AppendValuesFromMap(tablet_servers_, &all_tservers);
for (const auto& ts :all_tservers) {
if (!ContainsKey(active_tablet_servers, ts->uuid())) {
new_follower = ts;
break;
}
}
ASSERT_TRUE(new_follower != nullptr);
// Master may try to add the servers which are down until tserver_unresponsive_timeout_ms,
// so it is safer to wait until consensus metadata has 3 voters on new_follower.
ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(3, new_follower, tablet_id_, kTimeout));
// Wait for the master to be notified of the config change.
LOG(INFO) << "Waiting for Master to see new config...";
ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
3, tablet_id_, kTimeout,
WAIT_FOR_LEADER, VOTER_REPLICA,
&has_leader, &tablet_locations));
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
for (const auto& replica : tablet_locations.tablet_locations(0).interned_replicas()) {
const auto& uuid = tablet_locations.ts_infos(replica.ts_info_idx()).permanent_uuid();
ASSERT_NE(uuid, followers[0]->uuid());
ASSERT_NE(uuid, followers[1]->uuid());
}
}
// Test unsafe config change when the unsafe config contains 2 nodes.
// 1. Instantiate external minicluster with 1 tablet having 3 replicas and 5 TS.
// 2. Shut down leader.
// 3. Trigger unsafe config change on follower1 having follower1 and follower2 in the config.
// 4. Wait until the new config is populated on new_leader and master.
// 5. Verify that new config does not contain old leader.
TEST_F(AdminCliTest, TestUnsafeChangeConfigForConfigWithTwoNodes) {
const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
FLAGS_num_tablet_servers = 4;
FLAGS_num_replicas = 3;
NO_FATALS(BuildAndStart());
// Determine the list of tablet servers currently in the config.
TabletServerMap active_tablet_servers;
auto iter = tablet_replicas_.equal_range(tablet_id_);
for (auto it = iter.first; it != iter.second; ++it) {
InsertOrDie(&active_tablet_servers, it->second->uuid(), it->second);
}
// Get a baseline config reported to the master.
LOG(INFO) << "Waiting for Master to see the current replicas...";
master::GetTabletLocationsResponsePB tablet_locations;
bool has_leader;
ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
3, tablet_id_, kTimeout,
WAIT_FOR_LEADER, VOTER_REPLICA,
&has_leader, &tablet_locations));
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations);
TServerDetails* leader_ts;
ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts));
vector<TServerDetails*> followers;
ASSERT_EVENTUALLY([&] {
ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers));
});
// Wait for initial NO_OP to be committed by the leader.
ASSERT_OK(WaitForOpFromCurrentTerm(leader_ts, tablet_id_, COMMITTED_OPID, kTimeout));
// Shut down leader and prepare 2-node config.
cluster_->tablet_server_by_uuid(leader_ts->uuid())->Shutdown();
// Restart master to cleanup cache of dead servers from its list of candidate
// servers to trigger placement of new replicas on healthy servers.
cluster_->master()->Shutdown();
ASSERT_OK(cluster_->master()->Restart());
LOG(INFO) << "Forcing unsafe config change on tserver " << followers[1]->uuid();
const string& follower1_addr = Substitute("$0:$1",
followers[1]->registration.rpc_addresses(0).host(),
followers[1]->registration.rpc_addresses(0).port());
const vector<string> peer_uuid_list = { followers[0]->uuid(),
followers[1]->uuid(), };
ASSERT_OK(RunUnsafeChangeConfig(tablet_id_, follower1_addr, peer_uuid_list));
const TServerDetails* new_node = FindNodeForReReplication(active_tablet_servers);
ASSERT_TRUE(new_node != nullptr);
// Master may try to add the servers which are down until tserver_unresponsive_timeout_ms,
// so it is safer to wait until consensus metadata has 3 voters on follower1.
ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(3, new_node, tablet_id_, kTimeout));
// Wait for the master to be notified of the config change.
LOG(INFO) << "Waiting for Master to see new config...";
ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
3, tablet_id_, kTimeout,
WAIT_FOR_LEADER, VOTER_REPLICA,
&has_leader, &tablet_locations));
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
for (const auto& replica : tablet_locations.tablet_locations(0).interned_replicas()) {
const auto& uuid = tablet_locations.ts_infos(replica.ts_info_idx()).permanent_uuid();
ASSERT_NE(uuid, leader_ts->uuid());
}
}
// Test unsafe config change on a 5-replica tablet when the unsafe config contains 2 nodes.
// 1. Instantiate external minicluster with 1 tablet having 5 replicas and 8 TS.
// 2. Shut down leader and 2 followers.
// 3. Trigger unsafe config change on a surviving follower with those
// 2 surviving followers in the new config.
// 4. Wait until the new config is populated on new_leader and master.
// 5. Verify that new config does not contain old leader and old followers.
TEST_F(AdminCliTest, TestUnsafeChangeConfigWithFiveReplicaConfig) {
const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
// Retire the dead servers early with these settings.
FLAGS_num_tablet_servers = 8;
FLAGS_num_replicas = 5;
NO_FATALS(BuildAndStart());
vector<TServerDetails*> tservers;
vector<ExternalTabletServer*> external_tservers;
AppendValuesFromMap(tablet_servers_, &tservers);
for (TServerDetails* ts : tservers) {
external_tservers.push_back(cluster_->tablet_server_by_uuid(ts->uuid()));
}
// Determine the list of tablet servers currently in the config.
TabletServerMap active_tablet_servers;
auto iter = tablet_replicas_.equal_range(tablet_id_);
for (auto it = iter.first; it != iter.second; ++it) {
InsertOrDie(&active_tablet_servers, it->second->uuid(), it->second);
}
// Get a baseline config reported to the master.
LOG(INFO) << "Waiting for Master to see the current replicas...";
master::GetTabletLocationsResponsePB tablet_locations;
bool has_leader;
ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
5, tablet_id_, kTimeout,
WAIT_FOR_LEADER, VOTER_REPLICA,
&has_leader, &tablet_locations));
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations);
TServerDetails* leader_ts;
ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts));
vector<TServerDetails*> followers;
ASSERT_EVENTUALLY([&] {
ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers));
});
ASSERT_EQ(followers.size(), 4);
// Wait for initial NO_OP to be committed by the leader.
ASSERT_OK(WaitForOpFromCurrentTerm(leader_ts, tablet_id_, COMMITTED_OPID, kTimeout));
cluster_->tablet_server_by_uuid(followers[2]->uuid())->Shutdown();
cluster_->tablet_server_by_uuid(followers[3]->uuid())->Shutdown();
cluster_->tablet_server_by_uuid(leader_ts->uuid())->Shutdown();
// Restart master to cleanup cache of dead servers from its list of candidate
// servers to trigger placement of new replicas on healthy servers.
cluster_->master()->Shutdown();
ASSERT_OK(cluster_->master()->Restart());
LOG(INFO) << "Forcing unsafe config change on tserver " << followers[1]->uuid();
const string& follower1_addr = Substitute("$0:$1",
followers[1]->registration.rpc_addresses(0).host(),
followers[1]->registration.rpc_addresses(0).port());
const vector<string> peer_uuid_list = { followers[0]->uuid(),
followers[1]->uuid(), };
ASSERT_OK(RunUnsafeChangeConfig(tablet_id_, follower1_addr, peer_uuid_list));
const TServerDetails* new_node = FindNodeForReReplication(active_tablet_servers);
ASSERT_TRUE(new_node != nullptr);
// Master may try to add the servers which are down until tserver_unresponsive_timeout_ms,
// so it is safer to wait until consensus metadata has 5 voters back on new_node.
ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(5, new_node, tablet_id_, kTimeout));
// Wait for the master to be notified of the config change.
LOG(INFO) << "Waiting for Master to see new config...";
ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
5, tablet_id_, kTimeout,
WAIT_FOR_LEADER, VOTER_REPLICA,
&has_leader, &tablet_locations));
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
for (const auto& replica : tablet_locations.tablet_locations(0).interned_replicas()) {
const auto& uuid = tablet_locations.ts_infos(replica.ts_info_idx()).permanent_uuid();
ASSERT_NE(uuid, leader_ts->uuid());
ASSERT_NE(uuid, followers[2]->uuid());
ASSERT_NE(uuid, followers[3]->uuid());
}
}
// Test unsafe config change when there is a pending config on a surviving leader.
// 1. Instantiate external minicluster with 1 tablet having 3 replicas and 5 TS.
// 2. Shut down both the followers.
// 3. Trigger a regular config change on the leader which remains pending on leader.
// 4. Trigger unsafe config change on the surviving leader.
// 5. Wait until the new config is populated on leader and master.
// 6. Verify that new config does not contain old followers and a standby node
// has populated the new config.
TEST_F(AdminCliTest, TestUnsafeChangeConfigLeaderWithPendingConfig) {
const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
FLAGS_num_tablet_servers = 5;
FLAGS_num_replicas = 3;
NO_FATALS(BuildAndStart());
// Determine the list of tablet servers currently in the config.
TabletServerMap active_tablet_servers;
auto iter = tablet_replicas_.equal_range(tablet_id_);
for (auto it = iter.first; it != iter.second; ++it) {
InsertOrDie(&active_tablet_servers, it->second->uuid(), it->second);
}
// Get a baseline config reported to the master.
master::GetTabletLocationsResponsePB tablet_locations;
bool has_leader;
ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
3, tablet_id_, kTimeout,
WAIT_FOR_LEADER, VOTER_REPLICA,
&has_leader, &tablet_locations));
ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations);
TServerDetails* leader_ts;
ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts));
vector<TServerDetails*> followers;
ASSERT_EVENTUALLY([&] {
ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers));
});
ASSERT_EQ(2, followers.size());
// Wait for initial NO_OP to be committed by the leader.
ASSERT_OK(WaitForOpFromCurrentTerm(leader_ts, tablet_id_, COMMITTED_OPID, kTimeout));
// Shut down servers follower1 and follower2,
// so that leader can't replicate future config change ops.
cluster_->tablet_server_by_uuid(followers[0]->uuid())->Shutdown();
cluster_->tablet_server_by_uuid(followers[1]->uuid())->Shutdown();
// Now try to replicate a ChangeConfig operation. This should get stuck and
// time out because the server can't replicate any operations.
const auto s = RemoveServer(
leader_ts, tablet_id_, followers[1], MonoDelta::FromSeconds(2), -1);
ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
const string& leader_addr = Substitute("$0:$1",
leader_ts->registration.rpc_addresses(0).host(),
leader_ts->registration.rpc_addresses(0).port());
const vector<string> peer_uuid_list = { leader_ts->uuid() };
ASSERT_OK(RunUnsafeChangeConfig(tablet_id_, leader_addr, peer_uuid_list));
// Restart master to cleanup cache of dead servers from its list of candidate
// servers to trigger placement of new replicas on healthy servers.
cluster_->master()->Shutdown();
ASSERT_OK(cluster_->master()->Restart());
const TServerDetails* new_node = FindNodeForReReplication(active_tablet_servers);
ASSERT_NE(nullptr, new_node);
// Master may try to add the servers which are down until tserver_unresponsive_timeout_ms,
// so it is safer to wait until consensus metadata has 3 voters on new_node.
ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(3, new_node, tablet_id_, kTimeout));
// Wait for the master to be notified of the config change.
ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
3, tablet_id_, kTimeout,
WAIT_FOR_LEADER, VOTER_REPLICA,
&has_leader, &tablet_locations));
for (const auto& replica : tablet_locations.tablet_locations(0).interned_replicas()) {
const auto& uuid = tablet_locations.ts_infos(replica.ts_info_idx()).permanent_uuid();
ASSERT_NE(uuid, followers[0]->uuid());
ASSERT_NE(uuid, followers[1]->uuid());
}
}
// Test unsafe config change when there is a pending config on a surviving follower.
// 1. Instantiate external minicluster with 1 tablet having 3 replicas and 5 TS.
// 2. Shut down both the followers.
// 3. Trigger a regular config change on the leader which remains pending on leader.
// 4. Trigger a leader_step_down command such that leader is forced to become follower.
// 5. Trigger unsafe config change on the follower.
// 6. Wait until the new config is populated on leader and master.
// 7. Verify that new config does not contain old followers and a standby node
// has populated the new config.
TEST_F(AdminCliTest, TestUnsafeChangeConfigFollowerWithPendingConfig) {
const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
FLAGS_num_tablet_servers = 5;
FLAGS_num_replicas = 3;
NO_FATALS(BuildAndStart());
// Determine the list of tablet servers currently in the config.
TabletServerMap active_tablet_servers;
auto iter = tablet_replicas_.equal_range(tablet_id_);
for (auto it = iter.first; it != iter.second; ++it) {
InsertOrDie(&active_tablet_servers, it->second->uuid(), it->second);
}
// Get a baseline config reported to the master.
LOG(INFO) << "Waiting for Master to see the current replicas...";
master::GetTabletLocationsResponsePB tablet_locations;
bool has_leader;
ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
3, tablet_id_, kTimeout,
WAIT_FOR_LEADER, VOTER_REPLICA,
&has_leader, &tablet_locations));
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations);
TServerDetails* leader_ts;
ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts));
vector<TServerDetails*> followers;
ASSERT_EVENTUALLY([&] {
ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers));
});
// Wait for initial NO_OP to be committed by the leader.
ASSERT_OK(WaitForOpFromCurrentTerm(leader_ts, tablet_id_, COMMITTED_OPID, kTimeout));
// Shut down servers follower1 and follower2,
// so that leader can't replicate future config change ops.
cluster_->tablet_server_by_uuid(followers[0]->uuid())->Shutdown();
cluster_->tablet_server_by_uuid(followers[1]->uuid())->Shutdown();
// Restart master to cleanup cache of dead servers from its
// list of candidate servers to place the new replicas.
cluster_->master()->Shutdown();
ASSERT_OK(cluster_->master()->Restart());
// Now try to replicate a ChangeConfig operation. This should get stuck and time out
// because the server can't replicate any operations.
Status s = RemoveServer(leader_ts, tablet_id_, followers[1],
MonoDelta::FromSeconds(2), -1);
ASSERT_TRUE(s.IsTimedOut());
// Force leader to step down, best effort command since the leadership
// could change anytime during cluster lifetime.
string stderr;
s = RunKuduTool({
"tablet",
"leader_step_down",
cluster_->master()->bound_rpc_addr().ToString(),
tablet_id_
}, nullptr, &stderr);
bool not_currently_leader = stderr.find(
Status::IllegalState("").CodeAsString()) != string::npos;
ASSERT_TRUE(s.ok() || not_currently_leader) << "stderr: " << stderr;
LOG(INFO) << "Change Config Op timed out, Sending a Replace config "
<< "command when change config op is pending on the leader.";
const string& leader_addr = Substitute("$0:$1",
leader_ts->registration.rpc_addresses(0).host(),
leader_ts->registration.rpc_addresses(0).port());
const vector<string> peer_uuid_list = { leader_ts->uuid() };
ASSERT_OK(RunUnsafeChangeConfig(tablet_id_, leader_addr, peer_uuid_list));
const TServerDetails* new_node = FindNodeForReReplication(active_tablet_servers);
ASSERT_TRUE(new_node != nullptr);
// Master may try to add the servers which are down until tserver_unresponsive_timeout_ms,
// so it is safer to wait until consensus metadata has 3 voters on new_node.
ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(3, new_node, tablet_id_, kTimeout));
// Wait for the master to be notified of the config change.
LOG(INFO) << "Waiting for Master to see new config...";
ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
3, tablet_id_, kTimeout,
WAIT_FOR_LEADER, VOTER_REPLICA,
&has_leader, &tablet_locations));
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
for (const auto& replica : tablet_locations.tablet_locations(0).interned_replicas()) {
const auto& uuid = tablet_locations.ts_infos(replica.ts_info_idx()).permanent_uuid();
ASSERT_NE(uuid, followers[1]->uuid());
ASSERT_NE(uuid, followers[0]->uuid());
}
}
// Test unsafe config change when there are back to back pending configs on leader logs.
// 1. Instantiate external minicluster with 1 tablet having 3 replicas and 5 TS.
// 2. Shut down both the followers.
// 3. Trigger a regular config change on the leader which remains pending on leader.
// 4. Set a fault crash flag to trigger upon next commit of config change.
// 5. Trigger unsafe config change on the surviving leader which should trigger
// the fault while the old config change is being committed.
// 6. Shutdown and restart the leader and verify that tablet bootstrapped on leader.
// 7. Verify that a new node has populated the new config with 3 voters.
TEST_F(AdminCliTest, TestUnsafeChangeConfigWithPendingConfigsOnWAL) {
const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
FLAGS_num_tablet_servers = 5;
FLAGS_num_replicas = 3;
NO_FATALS(BuildAndStart());
// Determine the list of tablet servers currently in the config.
TabletServerMap active_tablet_servers;
auto iter = tablet_replicas_.equal_range(tablet_id_);
for (auto it = iter.first; it != iter.second; ++it) {
InsertOrDie(&active_tablet_servers, it->second->uuid(), it->second);
}
// Get a baseline config reported to the master.
LOG(INFO) << "Waiting for Master to see the current replicas...";
master::GetTabletLocationsResponsePB tablet_locations;
bool has_leader;
ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
3, tablet_id_, kTimeout,
WAIT_FOR_LEADER, VOTER_REPLICA,
&has_leader, &tablet_locations));
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations);
TServerDetails* leader_ts;
ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts));
vector<TServerDetails*> followers;
ASSERT_EVENTUALLY([&] {
ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers));
});
// Wait for initial NO_OP to be committed by the leader.
ASSERT_OK(WaitForOpFromCurrentTerm(leader_ts, tablet_id_, COMMITTED_OPID, kTimeout));
// Shut down servers follower1 and follower2,
// so that leader can't replicate future config change ops.
cluster_->tablet_server_by_uuid(followers[0]->uuid())->Shutdown();
cluster_->tablet_server_by_uuid(followers[1]->uuid())->Shutdown();
// Now try to replicate a ChangeConfig operation. This should get stuck and time out
// because the server can't replicate any operations.
Status s = RemoveServer(leader_ts, tablet_id_, followers[1],
MonoDelta::FromSeconds(2), -1);
ASSERT_TRUE(s.IsTimedOut());
LOG(INFO) << "Change Config Op timed out, Sending a Replace config "
<< "command when change config op is pending on the leader.";
const string& leader_addr = Substitute("$0:$1",
leader_ts->registration.rpc_addresses(0).host(),
leader_ts->registration.rpc_addresses(0).port());
const vector<string> peer_uuid_list = { leader_ts->uuid() };
ASSERT_OK(RunUnsafeChangeConfig(tablet_id_, leader_addr, peer_uuid_list));
// Inject the crash via fault_crash_before_cmeta_flush flag.
// Tablet will find 2 pending configs back to back during bootstrap,
// one from ChangeConfig (RemoveServer) and another from UnsafeChangeConfig.
ASSERT_OK(cluster_->SetFlag(
cluster_->tablet_server_by_uuid(leader_ts->uuid()),
"fault_crash_before_cmeta_flush", "1.0"));
const TServerDetails* new_node = FindNodeForReReplication(active_tablet_servers);
ASSERT_TRUE(new_node != nullptr);
// Restart master to cleanup cache of dead servers from its list of candidate
// servers to trigger placement of new replicas on healthy servers.
cluster_->master()->Shutdown();
ASSERT_OK(cluster_->master()->Restart());
ASSERT_OK(cluster_->tablet_server_by_uuid(
leader_ts->uuid())->WaitForInjectedCrash(kTimeout));
cluster_->tablet_server_by_uuid(leader_ts->uuid())->Shutdown();
ASSERT_OK(cluster_->tablet_server_by_uuid(
leader_ts->uuid())->Restart());
ASSERT_OK(WaitForNumTabletsOnTS(leader_ts, 1, kTimeout, nullptr));
ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(3, new_node, tablet_id_, kTimeout));
// Wait for the master to be notified of the config change.
ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
3, tablet_id_, kTimeout,
WAIT_FOR_LEADER, VOTER_REPLICA,
&has_leader, &tablet_locations));
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
for (const auto& replica : tablet_locations.tablet_locations(0).interned_replicas()) {
const auto& uuid = tablet_locations.ts_infos(replica.ts_info_idx()).permanent_uuid();
ASSERT_NE(uuid, followers[0]->uuid());
ASSERT_NE(uuid, followers[1]->uuid());
}
}
// Test unsafe config change on a 5-replica tablet when the mulitple pending configs
// on the surviving node.
// 1. Instantiate external minicluster with 1 tablet having 5 replicas and 9 TS.
// 2. Shut down all the followers.
// 3. Trigger unsafe config changes on the surviving leader with those
// dead followers in the new config.
// 4. Wait until the new config is populated on the master and the new leader.
// 5. Verify that new config does not contain old followers.
TEST_F(AdminCliTest, TestUnsafeChangeConfigWithMultiplePendingConfigs) {
const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
FLAGS_num_tablet_servers = 9;
FLAGS_num_replicas = 5;
// Retire the dead servers early with these settings.
NO_FATALS(BuildAndStart());
vector<TServerDetails*> tservers;
vector<ExternalTabletServer*> external_tservers;
AppendValuesFromMap(tablet_servers_, &tservers);
for (TServerDetails* ts : tservers) {
external_tservers.push_back(cluster_->tablet_server_by_uuid(ts->uuid()));
}
// Determine the list of tablet servers currently in the config.
TabletServerMap active_tablet_servers;
auto iter = tablet_replicas_.equal_range(tablet_id_);
for (auto it = iter.first; it != iter.second; ++it) {
InsertOrDie(&active_tablet_servers, it->second->uuid(), it->second);
}
// Get a baseline config reported to the master.
LOG(INFO) << "Waiting for Master to see the current replicas...";
master::GetTabletLocationsResponsePB tablet_locations;
bool has_leader;
ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
5, tablet_id_, kTimeout,
WAIT_FOR_LEADER, VOTER_REPLICA,
&has_leader, &tablet_locations));
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations);
TServerDetails* leader_ts;
ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts));
vector<TServerDetails*> followers;
for (const auto& elem : active_tablet_servers) {
if (elem.first == leader_ts->uuid()) {
continue;
}
followers.push_back(elem.second);
cluster_->tablet_server_by_uuid(elem.first)->Shutdown();
}
ASSERT_EQ(4, followers.size());
// Shutdown master to cleanup cache of dead servers from its list of candidate
// servers to trigger placement of new replicas on healthy servers when we restart later.
cluster_->master()->Shutdown();
const string& leader_addr = Substitute("$0:$1",
leader_ts->registration.rpc_addresses(0).host(),
leader_ts->registration.rpc_addresses(0).port());
// This should keep the multiple pending configs on the node since we are
// adding all the dead followers to the new config, and then eventually we write
// just one surviving node to the config.
// New config write sequences are: {ABCDE}, {ABCD}, {ABC}, {AB}, {A},
// A being the leader node where config is written and rest of the nodes are
// dead followers.
for (int num_replicas = followers.size(); num_replicas >= 0; num_replicas--) {
vector<string> peer_uuid_list;
peer_uuid_list.push_back(leader_ts->uuid());
for (int i = 0; i < num_replicas; i++) {
peer_uuid_list.push_back(followers[i]->uuid());
}
ASSERT_OK(RunUnsafeChangeConfig(tablet_id_, leader_addr, peer_uuid_list));
}
ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(1, leader_ts, tablet_id_, kTimeout));
ASSERT_OK(cluster_->master()->Restart());
const TServerDetails* new_node = FindNodeForReReplication(active_tablet_servers);
ASSERT_TRUE(new_node != nullptr);
// Master may try to add the servers which are down until tserver_unresponsive_timeout_ms,
// so it is safer to wait until consensus metadata has 5 voters on new_node.
ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(5, new_node, tablet_id_, kTimeout));
// Wait for the master to be notified of the config change.
LOG(INFO) << "Waiting for Master to see new config...";
ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
5, tablet_id_, kTimeout,
WAIT_FOR_LEADER, VOTER_REPLICA,
&has_leader, &tablet_locations));
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
for (const auto& replica : tablet_locations.tablet_locations(0).interned_replicas()) {
// Verify that old followers aren't part of new config.
for (const auto& old_follower : followers) {
const auto& uuid = tablet_locations.ts_infos(replica.ts_info_idx()).permanent_uuid();
ASSERT_NE(uuid, old_follower->uuid());
}
}
}
Status GetTermFromConsensus(const vector<TServerDetails*>& tservers,
const string& tablet_id,
int64_t *current_term) {
ConsensusStatePB cstate;
for (auto& ts : tservers) {
RETURN_NOT_OK(
GetConsensusState(ts, tablet_id, MonoDelta::FromSeconds(10), EXCLUDE_HEALTH_REPORT,
&cstate));
if (!cstate.leader_uuid().empty() &&
IsRaftConfigMember(cstate.leader_uuid(), cstate.committed_config()) &&
cstate.has_current_term()) {
*current_term = cstate.current_term();
return Status::OK();
}
}
return Status::NotFound(Substitute(
"No leader replica found for tablet $0", tablet_id));
}
TEST_F(AdminCliTest, TestAbruptLeaderStepDown) {
const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
const vector<string> kMasterFlags = {
"--catalog_manager_wait_for_new_tablets_to_elect_leader=false",
};
const vector<string> kTserverFlags = {
"--enable_leader_failure_detection=false",
};
FLAGS_num_tablet_servers = 3;
FLAGS_num_replicas = 3;
NO_FATALS(BuildAndStart(kTserverFlags, kMasterFlags));
// Wait for the tablet to be running.
vector<TServerDetails*> tservers;
AppendValuesFromMap(tablet_servers_, &tservers);
ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size());
for (auto& ts : tservers) {
ASSERT_OK(WaitUntilTabletRunning(ts, tablet_id_, kTimeout));
}
// Elect the leader and wait for the tservers and master to see the leader.
const auto* leader = tservers[0];
ASSERT_OK(StartElection(leader, tablet_id_, kTimeout));
ASSERT_OK(WaitForServersToAgree(kTimeout, tablet_servers_,
tablet_id_, /*minimum_index=*/1));
TServerDetails* master_observed_leader;
ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &master_observed_leader));
ASSERT_EQ(leader->uuid(), master_observed_leader->uuid());
// Ask the leader to step down.
string stderr;
Status s = RunKuduTool({
"tablet",
"leader_step_down",
"--abrupt",
cluster_->master()->bound_rpc_addr().ToString(),
tablet_id_
}, nullptr, &stderr);
// There shouldn't be a leader now, since failure detection is disabled.
for (const auto* ts : tservers) {
s = GetReplicaStatusAndCheckIfLeader(ts, tablet_id_, kTimeout);
ASSERT_TRUE(s.IsIllegalState()) << "Expected IllegalState because replica "
"should not be the leader: " << s.ToString();
}
}
TEST_F(AdminCliTest, TestGracefulLeaderStepDown) {
const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
const vector<string> kMasterFlags = {
"--catalog_manager_wait_for_new_tablets_to_elect_leader=false",
};
const vector<string> kTserverFlags = {
"--enable_leader_failure_detection=false",
};
FLAGS_num_tablet_servers = 3;
FLAGS_num_replicas = 3;
NO_FATALS(BuildAndStart(kTserverFlags, kMasterFlags));
// Wait for the tablet to be running.
vector<TServerDetails*> tservers;
AppendValuesFromMap(tablet_servers_, &tservers);
ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size());
for (auto& ts : tservers) {
ASSERT_OK(WaitUntilTabletRunning(ts, tablet_id_, kTimeout));
}
// Elect the leader and wait for the tservers and master to see the leader.
const auto* leader = tservers[0];
ASSERT_OK(StartElection(leader, tablet_id_, kTimeout));
ASSERT_OK(WaitForServersToAgree(kTimeout, tablet_servers_,
tablet_id_, /*minimum_index=*/1));
TServerDetails* master_observed_leader;
ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &master_observed_leader));
ASSERT_EQ(leader->uuid(), master_observed_leader->uuid());
// Ask the leader to transfer leadership to a specific peer.
const auto new_leader_uuid = tservers[1]->uuid();
string stderr;
Status s = RunKuduTool({
"tablet",
"leader_step_down",
Substitute("--new_leader_uuid=$0", new_leader_uuid),
cluster_->master()->bound_rpc_addr().ToString(),
tablet_id_
}, nullptr, &stderr);
ASSERT_TRUE(s.ok()) << s.ToString();
// Eventually, the chosen node should become leader.
ASSERT_EVENTUALLY([&]() {
ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &master_observed_leader));
ASSERT_EQ(new_leader_uuid, master_observed_leader->uuid());
});
// Ask the leader to transfer leadership.
s = RunKuduTool({
"tablet",
"leader_step_down",
cluster_->master()->bound_rpc_addr().ToString(),
tablet_id_
}, nullptr, &stderr);
ASSERT_TRUE(s.ok()) << s.ToString();
// Eventually, some other node should become leader.
const std::unordered_set<string> possible_new_leaders = { tservers[0]->uuid(),
tservers[2]->uuid() };
ASSERT_EVENTUALLY([&]() {
ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &master_observed_leader));
ASSERT_TRUE(ContainsKey(possible_new_leaders, master_observed_leader->uuid()));
});
}
// Leader should reject requests to transfer leadership to a non-member of the
// config.
TEST_F(AdminCliTest, TestLeaderTransferToEvictedPeer) {
const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
// In this test, tablet leadership is manually controlled and the master
// should not rereplicate.
const vector<string> kMasterFlags = {
"--catalog_manager_wait_for_new_tablets_to_elect_leader=false",
"--master_add_server_when_underreplicated=false",
};
const vector<string> kTserverFlags = {
"--enable_leader_failure_detection=false",
};
FLAGS_num_tablet_servers = 3;
FLAGS_num_replicas = 3;
NO_FATALS(BuildAndStart(kTserverFlags, kMasterFlags));
// Wait for the tablet to be running.
vector<TServerDetails*> tservers;
AppendValuesFromMap(tablet_servers_, &tservers);
ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size());
for (auto& ts : tservers) {
ASSERT_OK(WaitUntilTabletRunning(ts, tablet_id_, kTimeout));
}
// Elect the leader and wait for the tservers and master to see the leader.
const auto* leader = tservers[0];
ASSERT_OK(StartElection(leader, tablet_id_, kTimeout));
ASSERT_OK(WaitForServersToAgree(kTimeout, tablet_servers_,
tablet_id_, /*minimum_index=*/1));
TServerDetails* master_observed_leader;
ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &master_observed_leader));
ASSERT_EQ(leader->uuid(), master_observed_leader->uuid());
const string& master_addr = cluster_->master()->bound_rpc_addr().ToString();
// Evict the first follower.
string stderr;
const auto evicted_uuid = tservers[1]->uuid();
Status s = RunKuduTool({
"tablet",
"change_config",
"remove_replica",
master_addr,
tablet_id_,
evicted_uuid,
}, nullptr, &stderr);
ASSERT_TRUE(s.ok()) << s.ToString() << " stderr: " << stderr;
// Ask the leader to transfer leadership to the evicted peer.
stderr.clear();
s = RunKuduTool({
"tablet",
"leader_step_down",
Substitute("--new_leader_uuid=$0", evicted_uuid),
master_addr,
tablet_id_
}, nullptr, &stderr);
ASSERT_TRUE(s.IsRuntimeError()) << s.ToString() << " stderr: " << stderr;
ASSERT_STR_CONTAINS(stderr,
Substitute("tablet server $0 is not a voter in the active config",
evicted_uuid));
}
// Leader should reject requests to transfer leadership to a non-voter of the
// config.
TEST_F(AdminCliTest, TestLeaderTransferToNonVoter) {
const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
// In this test, tablet leadership is manually controlled and the master
// should not rereplicate.
const vector<string> kMasterFlags = {
"--catalog_manager_wait_for_new_tablets_to_elect_leader=false",
"--master_add_server_when_underreplicated=false",
};
const vector<string> kTserverFlags = {
"--enable_leader_failure_detection=false",
};
FLAGS_num_tablet_servers = 3;
FLAGS_num_replicas = 3;
NO_FATALS(BuildAndStart(kTserverFlags, kMasterFlags));
// Wait for the tablet to be running.
vector<TServerDetails*> tservers;
AppendValuesFromMap(tablet_servers_, &tservers);
ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size());
for (auto& ts : tservers) {
ASSERT_OK(WaitUntilTabletRunning(ts, tablet_id_, kTimeout));
}
// Elect the leader and wait for the tservers and master to see the leader.
const auto* leader = tservers[0];
ASSERT_OK(StartElection(leader, tablet_id_, kTimeout));
ASSERT_OK(WaitForServersToAgree(kTimeout, tablet_servers_,
tablet_id_, /*minimum_index=*/1));
TServerDetails* master_observed_leader;
ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &master_observed_leader));
ASSERT_EQ(leader->uuid(), master_observed_leader->uuid());
const string& master_addr = cluster_->master()->bound_rpc_addr().ToString();
// Demote the first follower to a non-voter.
string stderr;
const auto non_voter_uuid = tservers[1]->uuid();
Status s = RunKuduTool({
"tablet",
"change_config",
"change_replica_type",
master_addr,
tablet_id_,
non_voter_uuid,
"NON_VOTER",
}, nullptr, &stderr);
ASSERT_TRUE(s.ok()) << s.ToString() << " stderr: " << stderr;
// Ask the leader to transfer leadership to the non-voter.
stderr.clear();
s = RunKuduTool({
"tablet",
"leader_step_down",
Substitute("--new_leader_uuid=$0", non_voter_uuid),
master_addr,
tablet_id_
}, nullptr, &stderr);
ASSERT_TRUE(s.IsRuntimeError()) << s.ToString() << " stderr: " << stderr;
ASSERT_STR_CONTAINS(stderr,
Substitute("tablet server $0 is not a voter in the active config",
non_voter_uuid));
}
// Leader transfer causes the tablet to stop accepting new writes. This test
// tests that writes can still succeed even if lots of leader transfers and
// abrupt stepdowns are happening, as long as the writes have long enough
// timeouts to ride over the unstable leadership.
TEST_F(AdminCliTest, TestSimultaneousLeaderTransferAndAbruptStepdown) {
SKIP_IF_SLOW_NOT_ALLOWED();
const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
FLAGS_num_tablet_servers = 3;
FLAGS_num_replicas = 3;
NO_FATALS(BuildAndStart());
// Wait for the tablet to be running.
vector<TServerDetails*> tservers;
AppendValuesFromMap(tablet_servers_, &tservers);
ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size());
for (auto& ts : tservers) {
ASSERT_OK(WaitUntilTabletRunning(ts, tablet_id_, kTimeout));
}
// Start a workload with long timeouts. Everything should eventually go
// through but it might take a while given the leadership changes.
TestWorkload workload(cluster_.get());
workload.set_table_name(kTableId);
workload.set_timeout_allowed(false);
workload.set_write_timeout_millis(60000);
workload.set_num_replicas(FLAGS_num_replicas);
workload.set_num_write_threads(1);
workload.set_write_batch_size(1);
workload.Setup();
workload.Start();
// Sometimes this test is flaky under ASAN and the writes are never able to
// complete, so we'll back off on how frequently we disrupt leadership to give
// time for progress to be made.
#if defined(ADDRESS_SANITIZER)
const auto leader_change_period_sec = MonoDelta::FromMilliseconds(6000);
#else
const auto leader_change_period_sec = MonoDelta::FromMilliseconds(2000);
#endif
const string& master_addr = cluster_->master()->bound_rpc_addr().ToString();
while (workload.rows_inserted() < 1000) {
// Issue a graceful stepdown and then an abrupt stepdown, every
// 'leader_change_period_sec' seconds. The results are ignored because the
// tools might fail due to the constant leadership changes.
ignore_result(RunKuduTool({
"tablet",
"leader_step_down",
master_addr,
tablet_id_
}));
ignore_result(RunKuduTool({
"tablet",
"leader_step_down",
"--abrupt",
master_addr,
tablet_id_
}));
SleepFor(leader_change_period_sec);
}
}
class TestLeaderStepDown :
public AdminCliTest,
public ::testing::WithParamInterface<LeaderStepDownMode> {
};
INSTANTIATE_TEST_SUITE_P(, TestLeaderStepDown,
::testing::Values(LeaderStepDownMode::ABRUPT,
LeaderStepDownMode::GRACEFUL));
TEST_P(TestLeaderStepDown, TestLeaderStepDownWhenNotPresent) {
FLAGS_num_tablet_servers = 3;
FLAGS_num_replicas = 3;
NO_FATALS(BuildAndStart(
{ "--enable_leader_failure_detection=false" },
{ "--catalog_manager_wait_for_new_tablets_to_elect_leader=false" }));
vector<TServerDetails*> tservers;
AppendValuesFromMap(tablet_servers_, &tservers);
ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size());
for (auto& ts : tservers) {
ASSERT_OK(WaitUntilTabletRunning(ts,
tablet_id_,
MonoDelta::FromSeconds(10)));
}
int64_t current_term;
ASSERT_TRUE(GetTermFromConsensus(tservers, tablet_id_,
&current_term).IsNotFound());
string stdout;
ASSERT_OK(RunKuduTool({
"tablet",
"leader_step_down",
Substitute("--abrupt=$0", GetParam() == LeaderStepDownMode::ABRUPT),
cluster_->master()->bound_rpc_addr().ToString(),
tablet_id_
}, &stdout));
ASSERT_STR_CONTAINS(stdout,
Substitute("No leader replica found for tablet $0",
tablet_id_));
}
TEST_P(TestLeaderStepDown, TestRepeatedLeaderStepDown) {
FLAGS_num_tablet_servers = 3;
FLAGS_num_replicas = 3;
// Speed up leader failure detection and shorten the leader transfer period.
NO_FATALS(BuildAndStart({ "--raft_heartbeat_interval_ms=50" }));
vector<TServerDetails*> tservers;
AppendValuesFromMap(tablet_servers_, &tservers);
ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size());
for (auto& ts : tservers) {
ASSERT_OK(WaitUntilTabletRunning(ts,
tablet_id_,
MonoDelta::FromSeconds(10)));
}
// Start a workload.
TestWorkload workload(cluster_.get());
workload.set_table_name(kTableId);
workload.set_timeout_allowed(false);
workload.set_write_timeout_millis(30000);
workload.set_num_replicas(FLAGS_num_replicas);
workload.set_num_write_threads(4);
workload.set_write_batch_size(1);
workload.Setup();
workload.Start();
// Issue stepdown requests repeatedly. If we leave some time for an election,
// the workload should still make progress.
const string abrupt_flag = Substitute("--abrupt=$0",
GetParam() == LeaderStepDownMode::ABRUPT);
string stdout;
string stderr;
while (workload.rows_inserted() < 2000) {
stdout.clear();
stderr.clear();
Status s = RunKuduTool({
"tablet",
"leader_step_down",
abrupt_flag,
cluster_->master()->bound_rpc_addr().ToString(),
tablet_id_
}, &stdout, &stderr);
bool not_currently_leader = stderr.find(
Status::IllegalState("").CodeAsString()) != string::npos;
ASSERT_TRUE(s.ok() || not_currently_leader) << s.ToString();
SleepFor(MonoDelta::FromMilliseconds(1000));
}
ClusterVerifier(cluster_.get()).CheckCluster();
}
TEST_F(AdminCliTest, TestDeleteTable) {
FLAGS_num_tablet_servers = 1;
FLAGS_num_replicas = 1;
NO_FATALS(BuildAndStart());
string master_address = cluster_->master()->bound_rpc_addr().ToString();
shared_ptr<KuduClient> client;
ASSERT_OK(KuduClientBuilder()
.add_master_server_addr(master_address)
.Build(&client));
ASSERT_TOOL_OK(
"table",
"delete",
master_address,
kTableId,
"-reserve_seconds=0"
);
vector<string> tables;
ASSERT_OK(client->ListTables(&tables));
ASSERT_TRUE(tables.empty());
}
TEST_F(AdminCliTest, TestListSoftDeletedTables) {
FLAGS_num_tablet_servers = 1;
FLAGS_num_replicas = 1;
NO_FATALS(BuildAndStart());
{
string stdout;
string stderr;
Status s = RunKuduTool({
"table",
"list",
cluster_->master()->bound_rpc_addr().ToString(),
}, &stdout, &stderr);
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
ASSERT_STR_CONTAINS(stdout, kTableId);
}
{
string stdout;
ASSERT_OK(RunKuduTool({
"table",
"list",
"-soft_deleted_only=true",
cluster_->master()->bound_rpc_addr().ToString()
}, &stdout));
vector<string> stdout_lines = Split(stdout, ",", strings::SkipEmpty());
ASSERT_EQ(1, stdout_lines.size());
ASSERT_EQ("\n", stdout_lines[0]);
ASSERT_OK(RunKuduTool({
"table",
"delete",
"--reserve_seconds=300",
cluster_->master()->bound_rpc_addr().ToString(),
kTableId
}, &stdout));
stdout.clear();
string stderr;
Status s = RunKuduTool({
"table",
"list",
"-soft_deleted_only=true",
cluster_->master()->bound_rpc_addr().ToString(),
}, &stdout, &stderr);
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
ASSERT_STR_CONTAINS(stdout, kTableId);
}
}
static vector<string> TableListFormat() {
return { "pretty", "json", "json_compact" };
}
class ListTableCliSimpleParamTest :
public AdminCliTest,
public ::testing::WithParamInterface<string> {
};
INSTANTIATE_TEST_SUITE_P(, ListTableCliSimpleParamTest,
::testing::ValuesIn(TableListFormat()));
TEST_P(ListTableCliSimpleParamTest, TestListTables) {
FLAGS_num_tablet_servers = 1;
FLAGS_num_replicas = 1;
NO_FATALS(BuildAndStart());
string stdout;
string stderr;
Status s = RunKuduTool({
"table",
"list",
Substitute("--list_table_output_format=$0", GetParam()),
cluster_->master()->bound_rpc_addr().ToString(),
}, &stdout, &stderr);
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
if (GetParam() == "pretty") {
vector<string> stdout_lines = Split(stdout, ",", strings::SkipEmpty());
ASSERT_EQ(1, stdout_lines.size());
ASSERT_STR_CONTAINS(stdout, Substitute("$0\n", kTableId));
} else if (GetParam() == "json") {
ASSERT_STR_CONTAINS(stdout, Substitute("\"name\": \"$0\"", kTableId));
} else if (GetParam() == "json_compact") {
ASSERT_STR_CONTAINS(stdout, Substitute("\"name\":\"$0\"", kTableId));
} else {
FAIL() << "unexpected table list format" << GetParam();
}
}
class ListTableCliDetailParamTest :
public AdminCliTest,
public ::testing::WithParamInterface<string> {
};
INSTANTIATE_TEST_SUITE_P(, ListTableCliDetailParamTest,
::testing::ValuesIn(TableListFormat()));
TEST_P(ListTableCliDetailParamTest, TestListTablesDetail) {
FLAGS_num_tablet_servers = 3;
FLAGS_num_replicas = 3;
NO_FATALS(BuildAndStart());
// Add another table to test multiple tables output.
const string kAnotherTableId = "TestAnotherTable";
auto client_schema = KuduSchema::FromSchema(schema_);
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
ASSERT_OK(table_creator->table_name(kAnotherTableId)
.schema(&client_schema)
.set_range_partition_columns({ "key" })
.num_replicas(FLAGS_num_replicas)
.Create());
// Grab list of tablet_ids from any tserver.
vector<TServerDetails*> tservers;
vector<string> tablet_ids;
AppendValuesFromMap(tablet_servers_, &tservers);
ListRunningTabletIds(tservers.front(),
MonoDelta::FromSeconds(30), &tablet_ids);
string stdout;
string stderr;
Status s = RunKuduTool({
"table",
"list",
"--list_tablets",
Substitute("--list_table_output_format=$0", GetParam()),
cluster_->master()->bound_rpc_addr().ToString(),
}, &stdout, &stderr);
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
if (GetParam() == "pretty") {
vector<string> stdout_lines = Split(stdout, "\n", strings::SkipEmpty());
// Verify multiple tables along with their tablets and replica-uuids.
ASSERT_STR_CONTAINS(stdout, kTableId);
ASSERT_STR_CONTAINS(stdout, kAnotherTableId);
ASSERT_STR_CONTAINS(stdout, tablet_ids.front());
ASSERT_STR_CONTAINS(stdout, tablet_ids.back());
for (auto& ts : tservers) {
ASSERT_STR_CONTAINS(stdout, ts->uuid());
ASSERT_STR_CONTAINS(stdout, ts->uuid());
}
} else if (GetParam() == "json") {
// The 'json' format output should contain the table id for the table.
ASSERT_STR_CONTAINS(stdout, Substitute("\"name\": \"$0\"", kTableId));
ASSERT_STR_CONTAINS(stdout, Substitute("\"name\": \"$0\"", kAnotherTableId));
for (auto& tablet_id : tablet_ids) {
ASSERT_STR_CONTAINS(stdout, Substitute("\"tablet_id\": \"$0\"", tablet_id));
}
for (auto& ts : tservers) {
ASSERT_STR_CONTAINS(stdout, Substitute("\"uuid\": \"$0\"", ts->uuid()));
}
} else if (GetParam() == "json_compact") {
// The 'json_compact' format output should contain the table id for the table.
ASSERT_STR_CONTAINS(stdout, Substitute("\"name\":\"$0\"", kTableId));
ASSERT_STR_CONTAINS(stdout, Substitute("\"name\":\"$0\"", kAnotherTableId));
for (auto& tablet_id : tablet_ids) {
ASSERT_STR_CONTAINS(stdout, Substitute("\"tablet_id\":\"$0\"", tablet_id));
}
for (auto& ts : tservers) {
ASSERT_STR_CONTAINS(stdout, Substitute("\"uuid\":\"$0\"", ts->uuid()));
}
} else {
FAIL() << "unexpected table list format" << GetParam();
}
}
TEST_F(AdminCliTest, TestGetTableStatistics) {
vector<string> master_flags{ "--mock_table_metrics_for_testing=true",
"--on_disk_size_for_testing=1024",
"--live_row_count_for_testing=1000" };
NO_FATALS(BuildAndStart({}, master_flags));
string stdout, stderr;
Status s = RunKuduTool({
"table",
"statistics",
cluster_->master()->bound_rpc_addr().ToString(),
kTableId
}, &stdout, &stderr);
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
ASSERT_STR_CONTAINS(stdout, "on disk size: 1024\n"
"live row count: 1000\n");
}
TEST_F(AdminCliTest, TestDescribeTable) {
FLAGS_num_tablet_servers = 1;
FLAGS_num_replicas = 1;
NO_FATALS(BuildAndStart());
// The default table has a range partition with only one partition.
string stdout, stderr;
Status s = RunKuduTool({
"table",
"describe",
cluster_->master()->bound_rpc_addr().ToString(),
kTableId
}, &stdout, &stderr);
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
ASSERT_STR_CONTAINS(stdout,
"(\n"
" key INT32 NOT NULL,\n"
" int_val INT32 NOT NULL,\n"
" string_val STRING NULLABLE,\n"
" PRIMARY KEY (key)\n"
")\n"
"RANGE (key) (\n"
" PARTITION UNBOUNDED"
"\n"
")\n"
"OWNER alice\n"
"REPLICAS 1\n"
"COMMENT ");
// Test a table with all types in its schema, multiple hash partitioning
// levels, multiple range partitions, and non-covered ranges.
const string kAnotherTableId = "TestAnotherTable";
KuduSchema schema;
// Build the schema.
{
KuduSchemaBuilder builder;
builder.AddColumn("key_hash0")->Type(KuduColumnSchema::INT32)->NotNull();
builder.AddColumn("key_hash1")->Type(KuduColumnSchema::INT32)->NotNull();
builder.AddColumn("key_hash2")->Type(KuduColumnSchema::INT32)->NotNull();
builder.AddColumn("key_range")->Type(KuduColumnSchema::INT32)->NotNull();
builder.AddColumn("int8_val")->Type(KuduColumnSchema::INT8)
->Compression(KuduColumnStorageAttributes::CompressionType::NO_COMPRESSION)
->Encoding(KuduColumnStorageAttributes::EncodingType::PLAIN_ENCODING);
builder.AddColumn("int16_val")->Type(KuduColumnSchema::INT16)
->Compression(KuduColumnStorageAttributes::CompressionType::SNAPPY)
->Encoding(KuduColumnStorageAttributes::EncodingType::RLE);
builder.AddColumn("int32_val")->Type(KuduColumnSchema::INT32)
->Compression(KuduColumnStorageAttributes::CompressionType::LZ4)
->Encoding(KuduColumnStorageAttributes::EncodingType::BIT_SHUFFLE);
builder.AddColumn("int64_val")->Type(KuduColumnSchema::INT64)
->Compression(KuduColumnStorageAttributes::CompressionType::ZLIB)
->Default(KuduValue::FromInt(123));
builder.AddColumn("timestamp_val")->Type(KuduColumnSchema::UNIXTIME_MICROS);
builder.AddColumn("date_val")->Type(KuduColumnSchema::DATE);
builder.AddColumn("string_val")->Type(KuduColumnSchema::STRING)
->Encoding(KuduColumnStorageAttributes::EncodingType::PREFIX_ENCODING)
->Default(KuduValue::CopyString(Slice("hello")))
->Comment("comment for hello");
builder.AddColumn("bool_val")->Type(KuduColumnSchema::BOOL)
->Default(KuduValue::FromBool(false));
builder.AddColumn("float_val")->Type(KuduColumnSchema::FLOAT);
builder.AddColumn("double_val")->Type(KuduColumnSchema::DOUBLE)
->Default(KuduValue::FromDouble(123.4));
builder.AddColumn("binary_val")->Type(KuduColumnSchema::BINARY)
->Encoding(KuduColumnStorageAttributes::EncodingType::DICT_ENCODING);
builder.AddColumn("decimal_val")->Type(KuduColumnSchema::DECIMAL)
->Precision(30)
->Scale(4);
builder.SetPrimaryKey({ "key_hash0", "key_hash1", "key_hash2", "key_range" });
ASSERT_OK(builder.Build(&schema));
}
// Set up partitioning and create the table.
{
unique_ptr<KuduPartialRow> lower_bound0(schema.NewRow());
ASSERT_OK(lower_bound0->SetInt32("key_range", 0));
unique_ptr<KuduPartialRow> upper_bound0(schema.NewRow());
ASSERT_OK(upper_bound0->SetInt32("key_range", 1));
unique_ptr<KuduPartialRow> lower_bound1(schema.NewRow());
ASSERT_OK(lower_bound1->SetInt32("key_range", 2));
unique_ptr<KuduPartialRow> upper_bound1(schema.NewRow());
ASSERT_OK(upper_bound1->SetInt32("key_range", 3));
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
ASSERT_OK(table_creator->table_name(kAnotherTableId)
.schema(&schema)
.add_hash_partitions({"key_hash0"}, 2)
.add_hash_partitions({"key_hash1", "key_hash2"}, 3)
.set_range_partition_columns({"key_range"})
.add_range_partition(lower_bound0.release(), upper_bound0.release())
.add_range_partition(lower_bound1.release(), upper_bound1.release())
.num_replicas(FLAGS_num_replicas)
.set_owner("alice")
.set_comment("table comment")
.Create());
}
// OK, all that busywork is done. Test the describe output.
stdout.clear();
stderr.clear();
s = RunKuduTool({
"table",
"describe",
cluster_->master()->bound_rpc_addr().ToString(),
kAnotherTableId,
}, &stdout, &stderr);
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
ASSERT_STR_CONTAINS(stdout,
"(\n"
" key_hash0 INT32 NOT NULL,\n"
" key_hash1 INT32 NOT NULL,\n"
" key_hash2 INT32 NOT NULL,\n"
" key_range INT32 NOT NULL,\n"
" int8_val INT8 NULLABLE,\n"
" int16_val INT16 NULLABLE,\n"
" int32_val INT32 NULLABLE,\n"
" int64_val INT64 NULLABLE,\n"
" timestamp_val UNIXTIME_MICROS NULLABLE,\n"
" date_val DATE NULLABLE,\n"
" string_val STRING NULLABLE,\n"
" bool_val BOOL NULLABLE,\n"
" float_val FLOAT NULLABLE,\n"
" double_val DOUBLE NULLABLE,\n"
" binary_val BINARY NULLABLE,\n"
" decimal_val DECIMAL(30, 4) NULLABLE,\n"
" PRIMARY KEY (key_hash0, key_hash1, key_hash2, key_range)\n"
")\n"
"HASH (key_hash0) PARTITIONS 2,\n"
"HASH (key_hash1, key_hash2) PARTITIONS 3,\n"
"RANGE (key_range) (\n"
" PARTITION 0 <= VALUES < 1,\n"
" PARTITION 2 <= VALUES < 3\n"
")\n"
"OWNER alice\n"
"REPLICAS 1\n"
"COMMENT table comment");
// Test the describe output with `-show_attributes=true`.
stdout.clear();
stderr.clear();
s = RunKuduTool({
"table",
"describe",
cluster_->master()->bound_rpc_addr().ToString(),
kAnotherTableId,
"-show_attributes=true",
"-show_column_comment=true"
}, &stdout, &stderr);
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
ASSERT_STR_CONTAINS(
stdout,
"(\n"
" key_hash0 INT32 NOT NULL AUTO_ENCODING DEFAULT_COMPRESSION - -,\n"
" key_hash1 INT32 NOT NULL AUTO_ENCODING DEFAULT_COMPRESSION - -,\n"
" key_hash2 INT32 NOT NULL AUTO_ENCODING DEFAULT_COMPRESSION - -,\n"
" key_range INT32 NOT NULL AUTO_ENCODING DEFAULT_COMPRESSION - -,\n"
" int8_val INT8 NULLABLE PLAIN_ENCODING NO_COMPRESSION - -,\n"
" int16_val INT16 NULLABLE RLE SNAPPY - -,\n"
" int32_val INT32 NULLABLE BIT_SHUFFLE LZ4 - -,\n"
" int64_val INT64 NULLABLE AUTO_ENCODING ZLIB 123 123,\n"
" timestamp_val UNIXTIME_MICROS NULLABLE AUTO_ENCODING DEFAULT_COMPRESSION - -,\n"
" date_val DATE NULLABLE AUTO_ENCODING DEFAULT_COMPRESSION - -,\n"
" string_val STRING NULLABLE PREFIX_ENCODING DEFAULT_COMPRESSION \"hello\" \"hello\" "
"comment for hello,\n"
" bool_val BOOL NULLABLE AUTO_ENCODING DEFAULT_COMPRESSION false false,\n"
" float_val FLOAT NULLABLE AUTO_ENCODING DEFAULT_COMPRESSION - -,\n"
" double_val DOUBLE NULLABLE AUTO_ENCODING DEFAULT_COMPRESSION 123.4 123.4,\n"
" binary_val BINARY NULLABLE DICT_ENCODING DEFAULT_COMPRESSION - -,\n"
" decimal_val DECIMAL(30, 4) NULLABLE AUTO_ENCODING DEFAULT_COMPRESSION - -,\n"
" PRIMARY KEY (key_hash0, key_hash1, key_hash2, key_range)\n"
")\n"
"HASH (key_hash0) PARTITIONS 2,\n"
"HASH (key_hash1, key_hash2) PARTITIONS 3,\n"
"RANGE (key_range) (\n"
" PARTITION 0 <= VALUES < 1,\n"
" PARTITION 2 <= VALUES < 3\n"
")\n"
"OWNER alice\n"
"REPLICAS 1");
s = RunKuduTool({
"table",
"describe",
cluster_->master()->bound_rpc_addr().ToString(),
kAnotherTableId,
"-show_avro_format_schema"
}, &stdout, &stderr);
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
ASSERT_STR_CONTAINS(
stdout,
Substitute(
"{\n"
" \"type\": \"table\",\n"
" \"name\": \"TestAnotherTable\",\n"
" \"namespace\": \"kudu.cluster.$0\",\n"
" \"fields\": [\n"
" {\n"
" \"name\": \"key_hash0\",\n"
" \"type\": \"int\"\n"
" },\n"
" {\n"
" \"name\": \"key_hash1\",\n"
" \"type\": \"int\"\n"
" },\n"
" {\n"
" \"name\": \"key_hash2\",\n"
" \"type\": \"int\"\n"
" },\n"
" {\n"
" \"name\": \"key_range\",\n"
" \"type\": \"int\"\n"
" },\n"
" {\n"
" \"name\": \"int8_val\",\n"
" \"type\": [\n"
" \"null\",\n"
" \"int\"\n"
" ]\n"
" },\n"
" {\n"
" \"name\": \"int16_val\",\n"
" \"type\": [\n"
" \"null\",\n"
" \"int\"\n"
" ]\n"
" },\n"
" {\n"
" \"name\": \"int32_val\",\n"
" \"type\": [\n"
" \"null\",\n"
" \"int\"\n"
" ]\n"
" },\n"
" {\n"
" \"name\": \"int64_val\",\n"
" \"type\": [\n"
" \"null\",\n"
" \"long\"\n"
" ],\n"
" \"default\": \"123\"\n"
" },\n"
" {\n"
" \"name\": \"timestamp_val\",\n"
" \"type\": [\n"
" {\n"
" \"type\": \"long\",\n"
" \"logicalType\": \"time-micros\"\n"
" }\n"
" ]\n"
" },\n"
" {\n"
" \"name\": \"date_val\",\n"
" \"type\": [\n"
" {\n"
" \"type\": \"int\",\n"
" \"logicalType\": \"date\"\n"
" }\n"
" ]\n"
" },\n"
" {\n"
" \"name\": \"string_val\",\n"
" \"type\": [\n"
" \"null\",\n"
" \"string\"\n"
" ],\n"
" \"default\": \"\\\"hello\\\"\"\n"
" },\n"
" {\n"
" \"name\": \"bool_val\",\n"
" \"type\": [\n"
" \"null\",\n"
" \"bool\"\n"
" ],\n"
" \"default\": \"false\"\n"
" },\n"
" {\n"
" \"name\": \"float_val\",\n"
" \"type\": [\n"
" \"null\",\n"
" \"float\"\n"
" ]\n"
" },\n"
" {\n"
" \"name\": \"double_val\",\n"
" \"type\": [\n"
" \"null\",\n"
" \"double\"\n"
" ],\n"
" \"default\": \"123.4\"\n"
" },\n"
" {\n"
" \"name\": \"binary_val\",\n"
" \"type\": [\n"
" \"null\",\n"
" \"bytes\"\n"
" ]\n"
" },\n"
" {\n"
" \"name\": \"decimal_val\",\n"
" \"type\": [\n"
" {\n"
" \"type\": \"bytes\",\n"
" \"logicalType\": \"decimal\"\n"
" }\n"
" ]\n"
" }\n"
" ]\n"
"}\n",
client_->cluster_id())
);
}
// Simple tests to check whether the column describing flags
// work, alone and together as well.
TEST_F(AdminCliTest, TestDescribeTableColumnFlags) {
NO_FATALS(BuildAndStart());
string stdout;
const string kTableName = "TestAnotherTable";
{
// Build the schema
KuduSchema schema;
KuduSchemaBuilder builder;
builder.AddColumn("foo")->Type(KuduColumnSchema::INT32)->NotNull();
builder.AddColumn("bar")->Type(KuduColumnSchema::INT32)->NotNull()
->Comment("comment for bar");
builder.SetPrimaryKey({"foo"});
ASSERT_OK(builder.Build(&schema));
// Create the table
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
ASSERT_OK(table_creator->table_name(kTableName)
.schema(&schema)
.set_range_partition_columns({"foo"})
.Create());
}
// Test the describe output with flags
ASSERT_OK(RunKuduTool({"table",
"describe",
cluster_->master()->bound_rpc_addr().ToString(),
kTableName,
"-show_column_comment"},
&stdout));
ASSERT_STR_CONTAINS(stdout,
"(\n"
" foo INT32 NOT NULL,\n"
" bar INT32 NOT NULL comment for bar,\n"
" PRIMARY KEY (foo)\n"
")\n");
stdout.clear();
ASSERT_OK(RunKuduTool({"table",
"describe",
cluster_->master()->bound_rpc_addr().ToString(),
kTableName,
"-show_attributes"},
&stdout));
ASSERT_STR_CONTAINS(stdout,
"(\n"
" foo INT32 NOT NULL AUTO_ENCODING DEFAULT_COMPRESSION - -,\n"
" bar INT32 NOT NULL AUTO_ENCODING DEFAULT_COMPRESSION - -,\n"
" PRIMARY KEY (foo)\n"
")\n");
stdout.clear();
ASSERT_OK(RunKuduTool({"table",
"describe",
cluster_->master()->bound_rpc_addr().ToString(),
kTableName,
"-show_attributes",
"-show_column_comment"},
&stdout));
ASSERT_STR_CONTAINS(
stdout,
"(\n"
" foo INT32 NOT NULL AUTO_ENCODING DEFAULT_COMPRESSION - -,\n"
" bar INT32 NOT NULL AUTO_ENCODING DEFAULT_COMPRESSION - - comment for bar,\n"
" PRIMARY KEY (foo)\n"
")\n");
}
TEST_F(AdminCliTest, TestDescribeTableNoOwner) {
NO_FATALS(BuildAndStart({}, {"--allow_empty_owner=true"}));
KuduSchema schema;
KuduSchemaBuilder builder;
builder.AddColumn("foo")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey();
ASSERT_OK(builder.Build(&schema));
const string kTableName = "table_without_owner";
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
ASSERT_OK(table_creator->table_name(kTableName)
.schema(&schema)
.set_range_partition_columns({"foo"})
.num_replicas(1)
.set_owner("")
.Create());
string stdout;
ASSERT_OK(RunKuduTool(
{
"table",
"describe",
cluster_->master()->bound_rpc_addr().ToString(),
kTableName,
},
&stdout));
ASSERT_STR_CONTAINS(stdout, "OWNER \n");
}
TEST_F(AdminCliTest, TestDescribeTableCustomHashSchema) {
NO_FATALS(BuildAndStart({}, {}, {}, /*create_table*/false));
KuduSchema schema;
// Build the schema
{
KuduSchemaBuilder builder;
builder.AddColumn("key_range")->Type(KuduColumnSchema::INT32)->NotNull();
builder.AddColumn("key_hash0")->Type(KuduColumnSchema::INT32)->NotNull();
builder.AddColumn("key_hash1")->Type(KuduColumnSchema::INT32)->NotNull();
builder.SetPrimaryKey({"key_range", "key_hash0", "key_hash1"});
ASSERT_OK(builder.Build(&schema));
}
constexpr const char* const kTableName = "table_with_custom_hash_schema";
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
table_creator->table_name(kTableName)
.schema(&schema)
.add_hash_partitions({"key_hash0"}, 2)
.set_range_partition_columns({"key_range"})
.num_replicas(1);
// Create a KuduRangePartition with custom hash schema
{
unique_ptr<KuduPartialRow> lower(schema.NewRow());
CHECK_OK(lower->SetInt32("key_range", 0));
unique_ptr<KuduPartialRow> upper(schema.NewRow());
CHECK_OK(upper->SetInt32("key_range", 100));
unique_ptr<client::KuduRangePartition> partition(
new client::KuduRangePartition(lower.release(), upper.release()));
partition->add_hash_partitions({"key_hash1"}, 3);
table_creator->add_custom_range_partition(partition.release());
}
// Create a partition with table wide hash schema
{
unique_ptr<KuduPartialRow> lower(schema.NewRow());
CHECK_OK(lower->SetInt32("key_range", 100));
unique_ptr<KuduPartialRow> upper(schema.NewRow());
CHECK_OK(upper->SetInt32("key_range", 200));
table_creator->add_range_partition(lower.release(), upper.release());
}
// Create the table and run the tool
ASSERT_OK(table_creator->Create());
string stdout;
ASSERT_OK(RunKuduTool(
{
"table",
"describe",
cluster_->master()->bound_rpc_addr().ToString(),
kTableName,
},
&stdout));
ASSERT_STR_CONTAINS(stdout, "PARTITION 0 <= VALUES < 100 HASH(key_hash1) PARTITIONS 3,\n"
" PARTITION 100 <= VALUES < 200");
}
class ListTableCliParamTest :
public AdminCliTest,
public ::testing::WithParamInterface<tuple<bool /* show_tablet_partition_info*/,
bool /* show_hash_partition_info*/,
string /* output format*/>> {
};
INSTANTIATE_TEST_SUITE_P(, ListTableCliParamTest,
::testing::Combine(::testing::Bool(),
::testing::Bool(),
::testing::ValuesIn(TableListFormat())));
// Basic test that the kudu tool works in the list tablets case.
TEST_P(ListTableCliParamTest, ListTabletWithPartitionInfo) {
const auto show_tp = std::get<0>(GetParam()) ? PartitionSchema::HashPartitionInfo::SHOW :
PartitionSchema::HashPartitionInfo::HIDE;
const auto show_hp = std::get<1>(GetParam()) ? PartitionSchema::HashPartitionInfo::SHOW :
PartitionSchema::HashPartitionInfo::HIDE;
const auto kTimeout = MonoDelta::FromSeconds(30);
// This combination of parameters does not meet expectations.
if (show_tp == PartitionSchema::HashPartitionInfo::HIDE &&
show_hp == PartitionSchema::HashPartitionInfo::SHOW) {
return;
}
FLAGS_num_tablet_servers = 1;
FLAGS_num_replicas = 1;
NO_FATALS(BuildAndStart());
vector<TServerDetails*> tservers;
vector<string> base_tablet_ids;
AppendValuesFromMap(tablet_servers_, &tservers);
ListRunningTabletIds(tservers.front(), kTimeout, &base_tablet_ids);
// Test a table with all types in its schema, multiple hash partitioning
// levels, multiple range partitions, and non-covered ranges.
constexpr const char* const kTableName = "TestTableListPartition";
KuduSchema schema;
// Build the schema.
{
KuduSchemaBuilder builder;
builder.AddColumn("key_hash0")->Type(KuduColumnSchema::INT32)->NotNull();
builder.AddColumn("key_hash1")->Type(KuduColumnSchema::INT32)->NotNull();
builder.AddColumn("key_hash2")->Type(KuduColumnSchema::INT32)->NotNull();
builder.AddColumn("key_range")->Type(KuduColumnSchema::INT32)->NotNull();
builder.SetPrimaryKey({ "key_hash0", "key_hash1", "key_hash2", "key_range" });
ASSERT_OK(builder.Build(&schema));
}
// Set up partitioning and create the table.
{
unique_ptr<KuduPartialRow> lower_bound0(schema.NewRow());
ASSERT_OK(lower_bound0->SetInt32("key_range", 0));
unique_ptr<KuduPartialRow> upper_bound0(schema.NewRow());
ASSERT_OK(upper_bound0->SetInt32("key_range", 1));
unique_ptr<KuduPartialRow> lower_bound1(schema.NewRow());
ASSERT_OK(lower_bound1->SetInt32("key_range", 2));
unique_ptr<KuduPartialRow> upper_bound1(schema.NewRow());
ASSERT_OK(upper_bound1->SetInt32("key_range", 3));
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
ASSERT_OK(table_creator->table_name(kTableName)
.schema(&schema)
.add_hash_partitions({"key_hash0"}, 2)
.add_hash_partitions({"key_hash1", "key_hash2"}, 3)
.set_range_partition_columns({"key_range"})
.add_range_partition(lower_bound0.release(), upper_bound0.release())
.add_range_partition(lower_bound1.release(), upper_bound1.release())
.num_replicas(FLAGS_num_replicas)
.Create());
}
vector<string> new_tablet_ids;
ListRunningTabletIds(tservers.front(), kTimeout, &new_tablet_ids);
vector<string> delta_tablet_ids;
for (auto& tablet_id : base_tablet_ids) {
if (std::find(new_tablet_ids.begin(), new_tablet_ids.end(), tablet_id) ==
new_tablet_ids.end()) {
delta_tablet_ids.push_back(tablet_id);
}
}
// Test the list tablet with partition output.
string stdout;
string stderr;
Status s = RunKuduTool({
"table",
"list",
"--list_tablets",
Substitute("--show_tablet_partition_info=$0", std::get<0>(GetParam())),
Substitute("--show_hash_partition_info=$0", std::get<1>(GetParam())),
Substitute("--list_table_output_format=$0", std::get<2>(GetParam())),
"--tables",
kTableName,
cluster_->master()->bound_rpc_addr().ToString(),
}, &stdout, &stderr);
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
client::sp::shared_ptr<KuduTable> table;
ASSERT_OK(client_->OpenTable(kTableName, &table));
const auto& partition_schema = table->partition_schema();
const auto& schema_internal = KuduSchema::ToSchema(table->schema());
// make sure table name correct
ASSERT_STR_CONTAINS(stdout, kTableName);
master::ListTablesResponsePB tables_info;
ASSERT_OK(ListTablesWithInfo(
cluster_->master_proxy(), kTableName, kTimeout, &tables_info));
for (const auto& table : tables_info.tables()) {
for (const auto& pt : table.tablet_with_partition()) {
Partition partition;
Partition::FromPB(pt.partition(), &partition);
string partition_str;
if (show_tp) {
partition_str = " : " + partition_schema.PartitionDebugString(partition,
schema_internal,
show_hp);
}
string tablet_with_partition = pt.tablet_id() + partition_str;
if (std::get<2>(GetParam()) == "pretty") {
ASSERT_STR_CONTAINS(stdout, tablet_with_partition);
}
}
}
if (std::get<2>(GetParam()) == "pretty") {
ASSERT_STR_CONTAINS(stdout, Substitute("$0", kTableName));
for (auto& tablet_id : delta_tablet_ids) {
ASSERT_STR_CONTAINS(stdout, tablet_id);
}
for (auto& ts : tservers) {
ASSERT_STR_CONTAINS(stdout, ts->uuid());
ASSERT_STR_CONTAINS(stdout, ts->uuid());
}
} else if (std::get<2>(GetParam()) == "json") {
ASSERT_STR_CONTAINS(stdout, Substitute("\"name\": \"$0\"", kTableName));
for (auto& tablet_id : delta_tablet_ids) {
ASSERT_STR_CONTAINS(stdout, Substitute("\"tablet_id\": \"$0\"", tablet_id));
}
for (auto& ts : tservers) {
ASSERT_STR_CONTAINS(stdout, Substitute("\"uuid\": \"$0\"", ts->uuid()));
}
} else if (std::get<2>(GetParam()) == "json_compact") {
ASSERT_STR_CONTAINS(stdout, Substitute("\"name\":\"$0\"", kTableName));
for (auto& tablet_id : delta_tablet_ids) {
ASSERT_STR_CONTAINS(stdout, Substitute("\"tablet_id\":\"$0\"", tablet_id));
}
for (auto& ts : tservers) {
ASSERT_STR_CONTAINS(stdout, Substitute("\"uuid\":\"$0\"", ts->uuid()));
}
} else {
FAIL() << "unexpected table list format" << std::get<2>(GetParam());
}
}
TEST_F(AdminCliTest, TestLocateRow) {
FLAGS_num_tablet_servers = 1;
FLAGS_num_replicas = 1;
NO_FATALS(BuildAndStart());
// Test an OK case. Not much going on here since the table has only one
// tablet, which covers the whole universe.
string stdout, stderr;
Status s = RunKuduTool({
"table",
"locate_row",
cluster_->master()->bound_rpc_addr().ToString(),
kTableId,
"[-1]"
}, &stdout, &stderr);
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
// Grab list of tablet_ids from the tserver and check the output.
vector<TServerDetails*> tservers;
vector<string> tablet_ids;
AppendValuesFromMap(tablet_servers_, &tservers);
ListRunningTabletIds(tservers.front(),
MonoDelta::FromSeconds(30),
&tablet_ids);
ASSERT_EQ(1, tablet_ids.size());
ASSERT_STR_CONTAINS(stdout, tablet_ids[0]);
// Test a few error cases.
const auto check_bad_input = [&](const string& json, const string& error) {
string out, err;
Status s = RunKuduTool({
"table",
"locate_row",
cluster_->master()->bound_rpc_addr().ToString(),
kTableId,
json,
}, &out, &err);
ASSERT_TRUE(s.IsRuntimeError());
ASSERT_STR_CONTAINS(err, error);
};
// String instead of int.
NO_FATALS(check_bad_input("[\"foo\"]", "unable to parse"));
// Float instead of int.
NO_FATALS(check_bad_input("[1.2]", "unable to parse"));
// Overflow (recall the key is INT32).
NO_FATALS(check_bad_input(
Substitute("[$0]", std::to_string(std::numeric_limits<int64_t>::max())),
"out of range"));
}
TEST_F(AdminCliTest, TestLocateRowMore) {
FLAGS_num_tablet_servers = 1;
FLAGS_num_replicas = 1;
NO_FATALS(BuildAndStart());
// Make a complex schema with multiple columns in the primary key, hash and
// range partitioning, and non-covered ranges.
const string kAnotherTableId = "TestAnotherTable";
KuduSchema schema;
// Build the schema.
KuduSchemaBuilder builder;
builder.AddColumn("key_hash")->Type(KuduColumnSchema::STRING)->NotNull();
builder.AddColumn("key_range")->Type(KuduColumnSchema::INT32)->NotNull();
builder.SetPrimaryKey({ "key_hash", "key_range" });
ASSERT_OK(builder.Build(&schema));
// Set up partitioning and create the table.
unique_ptr<KuduPartialRow> lower_bound0(schema.NewRow());
ASSERT_OK(lower_bound0->SetInt32("key_range", 0));
unique_ptr<KuduPartialRow> upper_bound0(schema.NewRow());
ASSERT_OK(upper_bound0->SetInt32("key_range", 1));
unique_ptr<KuduPartialRow> lower_bound1(schema.NewRow());
ASSERT_OK(lower_bound1->SetInt32("key_range", 2));
unique_ptr<KuduPartialRow> upper_bound1(schema.NewRow());
ASSERT_OK(upper_bound1->SetInt32("key_range", 3));
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
ASSERT_OK(table_creator->table_name(kAnotherTableId)
.schema(&schema)
.add_hash_partitions({ "key_hash" }, 2)
.set_range_partition_columns({ "key_range" })
.add_range_partition(lower_bound0.release(), upper_bound0.release())
.add_range_partition(lower_bound1.release(), upper_bound1.release())
.num_replicas(FLAGS_num_replicas)
.Create());
vector<TServerDetails*> tservers;
vector<string> tablet_ids;
AppendValuesFromMap(tablet_servers_, &tservers);
ListRunningTabletIds(tservers.front(),
MonoDelta::FromSeconds(30),
&tablet_ids);
std::unordered_set<string> tablet_id_set(tablet_ids.begin(), tablet_ids.end());
// Since there isn't a great alternative way to validate the answer the tool
// gives, and the scan token code underlying the implementation is extensively
// tested, we won't overexert ourselves checking correctness, and instead just
// do sanity checks and tool usability checks.
string stdout, stderr;
Status s = RunKuduTool({
"table",
"locate_row",
cluster_->master()->bound_rpc_addr().ToString(),
kAnotherTableId,
"[\"foo bar\",0]"
}, &stdout, &stderr);
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
StripWhiteSpace(&stdout);
const auto tablet_id_for_0 = stdout;
ASSERT_TRUE(ContainsKey(tablet_id_set, tablet_id_for_0))
<< "expected to find tablet id " << tablet_id_for_0;
// A row in a different range partition should be in a different tablet.
stdout.clear();
stderr.clear();
s = RunKuduTool({
"table",
"locate_row",
cluster_->master()->bound_rpc_addr().ToString(),
kAnotherTableId,
"[\"foo\",2]"
}, &stdout, &stderr);
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
StripWhiteSpace(&stdout);
const auto tablet_id_for_2 = stdout;
ASSERT_TRUE(ContainsKey(tablet_id_set, tablet_id_for_0))
<< "expected to find tablet id " << tablet_id_for_2;
ASSERT_NE(tablet_id_for_0, tablet_id_for_2);
// Test a few error cases.
const auto check_bad_input = [&](const string& json, const string& error) {
string out, err;
Status s = RunKuduTool({
"table",
"locate_row",
cluster_->master()->bound_rpc_addr().ToString(),
kAnotherTableId,
json,
}, &out, &err);
ASSERT_TRUE(s.IsRuntimeError());
ASSERT_STR_CONTAINS(err, error);
};
// Test locating a row lying in a non-covered range.
NO_FATALS(check_bad_input(
"[\"foo\",1]",
"row does not belong to any currently existing tablet"));
// Test providing a missing or incomplete primary key.
NO_FATALS(check_bad_input(
"[]",
"wrong number of key columns specified: expected 2 but received 0"));
NO_FATALS(check_bad_input(
"[\"foo\"]",
"wrong number of key columns specified: expected 2 but received 1"));
// Test providing too many key column values.
NO_FATALS(check_bad_input(
"[\"foo\",2,\"bar\"]",
"wrong number of key columns specified: expected 2 but received 3"));
// Test providing an invalid value for a key column when there's multiple
// key columns.
NO_FATALS(check_bad_input("[\"foo\",\"bar\"]", "unable to parse"));
// Test providing bad json.
NO_FATALS(check_bad_input("[", "JSON text is corrupt"));
NO_FATALS(check_bad_input("[\"foo\",]", "JSON text is corrupt"));
// Test providing valid JSON that's not an array.
NO_FATALS(check_bad_input(
"{ \"key_hash\" : \"foo\", \"key_range\" : 2 }",
"wrong type during field extraction: expected object array"));
}
TEST_F(AdminCliTest, TestLocateRowAndCheckRowPresence) {
FLAGS_num_tablet_servers = 1;
FLAGS_num_replicas = 1;
NO_FATALS(BuildAndStart());
// Grab list of tablet_ids from any tserver so we can check the output.
vector<TServerDetails*> tservers;
vector<string> tablet_ids;
AppendValuesFromMap(tablet_servers_, &tservers);
ListRunningTabletIds(tservers.front(),
MonoDelta::FromSeconds(30),
&tablet_ids);
ASSERT_EQ(1, tablet_ids.size());
const string& expected_tablet_id = tablet_ids[0];
// Test the case when the row does not exist.
string stdout, stderr;
Status s = RunKuduTool({
"table",
"locate_row",
cluster_->master()->bound_rpc_addr().ToString(),
kTableId,
"[0]",
"-check_row_existence",
}, &stdout, &stderr);
ASSERT_TRUE(s.IsRuntimeError()) << ToolRunInfo(s, stdout, stderr);
ASSERT_STR_CONTAINS(stdout, expected_tablet_id);
ASSERT_STR_CONTAINS(stderr, "row does not exist");
// Insert row with key = 0.
client::sp::shared_ptr<KuduClient> client;
CreateClient(&client);
client::sp::shared_ptr<KuduTable> table;
ASSERT_OK(client->OpenTable(kTableId, &table));
unique_ptr<KuduInsert> insert(table->NewInsert());
auto* row = insert->mutable_row();
ASSERT_OK(row->SetInt32("key", 0));
ASSERT_OK(row->SetInt32("int_val", 12345));
ASSERT_OK(row->SetString("string_val", "hello"));
const string row_str = row->ToString();
auto session = client->NewSession();
ASSERT_OK(session->Apply(insert.release()));
ASSERT_OK(session->Flush());
ASSERT_OK(session->Close());
// Test the case when the row exists. Since the scan is done by a subprocess
// using a different client instance, it's possible the scan will not
// immediately retrieve the row even though the write has already succeeded.
// This use case is what timestamp propagation is for, but there's no way to
// propagate a timestamp to a tool (and there shouldn't be). Instead, we
// ASSERT_EVENTUALLY.
ASSERT_EVENTUALLY([&]() {
stdout.clear();
stderr.clear();
s = RunKuduTool({
"table",
"locate_row",
cluster_->master()->bound_rpc_addr().ToString(),
kTableId,
"[0]",
"-check_row_existence",
}, &stdout, &stderr);
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
ASSERT_STR_CONTAINS(stdout, expected_tablet_id);
ASSERT_STR_CONTAINS(stdout, row_str);
});
}
TEST_F(AdminCliTest, TestDumpMemTrackers) {
FLAGS_num_tablet_servers = 1;
FLAGS_num_replicas = 1;
NO_FATALS(BuildAndStart());
// Grab list of tablet_ids from any tserver so we can check the output.
vector<TServerDetails*> tservers;
vector<string> tablet_ids;
AppendValuesFromMap(tablet_servers_, &tservers);
ListRunningTabletIds(tservers.front(),
MonoDelta::FromSeconds(30),
&tablet_ids);
ASSERT_EQ(1, tablet_ids.size());
const string& tablet_id = tablet_ids[0];
// The tool should work against the master.
string stdout, stderr;
Status s = RunKuduTool({
"master",
"dump_memtrackers",
cluster_->master()->bound_rpc_hostport().ToString(),
"-format=csv",
}, &stdout, &stderr);
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
// The memtrackers dump from the master should contain a tracker for the
// systablet that has 'server' as its parent.
ASSERT_STR_CONTAINS(
stdout,
Substitute("tablet-$0,server",
master::SysCatalogTable::kSysCatalogTabletId));
// The tool should work against the tablet server.
stdout.clear();
stderr.clear();
s = RunKuduTool({
"tserver",
"dump_memtrackers",
cluster_->tablet_server(0)->bound_rpc_hostport().ToString(),
"-memtracker_output=json_compact",
}, &stdout, &stderr);
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
// The memtrackers dump from the tablet server should contain a tracker for
// the tablet and some tracker that is a child of that tracker.
const string tablet_tracker_id = Substitute("tablet-$0", tablet_id);
ASSERT_STR_CONTAINS(stdout, Substitute("\"id\":\"$0\"", tablet_tracker_id));
ASSERT_STR_CONTAINS(stdout, Substitute("\"parent_id\":\"$0\"", tablet_tracker_id));
}
TEST_F(AdminCliTest, TestAuthzResetCacheIncorrectMasterAddressList) {
ExternalMiniClusterOptions opts;
opts.num_masters = 3;
cluster_.reset(new ExternalMiniCluster(std::move(opts)));
ASSERT_OK(cluster_->Start());
auto master_addrs_str = HostPort::ToCommaSeparatedString(cluster_->master_rpc_addrs());
auto incorrect_master_addrs_str = cluster_->master(0)->bound_rpc_hostport().ToString();
string out;
string err;
Status s;
s = RunKuduTool({
"master",
"authz_cache",
"refresh",
incorrect_master_addrs_str,
}, &out, &err);
ASSERT_TRUE(s.IsRuntimeError()) << ToolRunInfo(s, out, err);
const auto ref_err_msg = Substitute(
"Invalid argument: list of master addresses provided ($0) "
"does not match the actual cluster configuration ($1)",
incorrect_master_addrs_str, master_addrs_str);
ASSERT_STR_CONTAINS(err, ref_err_msg);
// However, the '--force' option makes it possible to run the tool even
// if the specified list of master addresses does not match the actual
// list of master addresses in the cluster.
out.clear();
err.clear();
ASSERT_OK(RunKuduTool({
"master",
"authz_cache",
"refresh",
"--force",
incorrect_master_addrs_str,
}, &out, &err));
}
TEST_F(AdminCliTest, TestAuthzResetCacheNotAuthorized) {
vector<string> master_flags{ "--superuser_acl=no-such-user" };
NO_FATALS(BuildAndStart({}, master_flags));
// The tool should report an error: it's not possible to reset the cache
// since the OS user under which the tools is invoked is not a superuser/admin
// (the --superuser_acl flag is set to contain a non-existent user only).
string out;
string err;
Status s = RunKuduTool({
"master",
"authz_cache",
"refresh",
cluster_->master()->bound_rpc_hostport().ToString(),
}, &out, &err);
ASSERT_TRUE(s.IsRuntimeError()) << ToolRunInfo(s, out, err);
ASSERT_STR_CONTAINS(err,
"Remote error: Not authorized: unauthorized access to method: "
"RefreshAuthzCache");
}
TEST_F(AdminCliTest, TestExtraConfig) {
NO_FATALS(BuildAndStart());
string master_address = cluster_->master()->bound_rpc_addr().ToString();
// Gets extra-configs when no extra config set.
{
string stdout, stderr;
Status s = RunKuduTool({
"table",
"get_extra_configs",
master_address,
kTableId
}, &stdout, &stderr);
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
ASSERT_EQ(stdout, " Configuration | Value\n"
"---------------+-------\n");
}
// Sets "kudu.table.history_max_age_sec" to 3600.
{
ASSERT_TOOL_OK(
"table",
"set_extra_config",
master_address,
kTableId,
"kudu.table.history_max_age_sec",
"3600"
);
}
// Gets all extra-configs.
{
string stdout, stderr;
Status s = RunKuduTool({
"table",
"get_extra_configs",
master_address,
kTableId,
}, &stdout, &stderr);
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
ASSERT_STR_CONTAINS(stdout, "kudu.table.history_max_age_sec | 3600");
}
// Gets the specified extra-config, the configuration exists.
{
string stdout, stderr;
Status s = RunKuduTool({
"table",
"get_extra_configs",
master_address,
kTableId,
"-config_names=kudu.table.history_max_age_sec"
}, &stdout, &stderr);
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
ASSERT_STR_CONTAINS(stdout, "kudu.table.history_max_age_sec | 3600");
}
// Gets the duplicate extra-configs, the configuration exists.
{
string stdout, stderr;
Status s = RunKuduTool({
"table",
"get_extra_configs",
master_address,
kTableId,
"-config_names=kudu.table.history_max_age_sec,kudu.table.history_max_age_sec"
}, &stdout, &stderr);
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
ASSERT_EQ(stdout, " Configuration | Value\n"
"--------------------------------+-------\n"
" kudu.table.history_max_age_sec | 3600\n");
}
// Gets the specified extra-config, the configuration doesn't exists.
{
string stdout, stderr;
Status s = RunKuduTool({
"table",
"get_extra_configs",
master_address,
kTableId,
"-config_names=foobar"
}, &stdout, &stderr);
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
ASSERT_EQ(stdout, " Configuration | Value\n"
"---------------+-------\n");
}
}
// Insert num_rows into table from start key to (start key + num_rows).
// The other two columns of the table are specified as fixed value.
// If the insert value is out of the range partition of the table,
// the function will return IOError which as we expect.
static Status InsertTestRows(const client::sp::shared_ptr<KuduClient>& client,
const string& table_name,
int start_key,
int num_rows) {
client::sp::shared_ptr<KuduTable> table;
RETURN_NOT_OK(client->OpenTable(table_name, &table));
auto session = client->NewSession();
for (int i = start_key; i < num_rows + start_key; ++i) {
unique_ptr<KuduInsert> insert(table->NewInsert());
auto* row = insert->mutable_row();
RETURN_NOT_OK(row->SetInt32("key", i));
RETURN_NOT_OK(row->SetInt32("int_val", 12345));
RETURN_NOT_OK(row->SetString("string_val", "hello"));
Status result = session->Apply(insert.release());
if (result.IsIOError()) {
vector<kudu::client::KuduError*> errors;
ElementDeleter drop(&errors);
bool overflowed;
session->GetPendingErrors(&errors, &overflowed);
EXPECT_FALSE(overflowed);
EXPECT_EQ(1, errors.size());
EXPECT_TRUE(errors[0]->status().IsNotFound());
return Status::NotFound(Substitute("No range partition covers the insert value $0", i));
}
RETURN_NOT_OK(result);
}
RETURN_NOT_OK(session->Flush());
RETURN_NOT_OK(session->Close());
return Status::OK();
}
TEST_F(AdminCliTest, TestAddAndDropUnboundedPartition) {
FLAGS_num_tablet_servers = 1;
FLAGS_num_replicas = 1;
NO_FATALS(BuildAndStart());
const string& master_addr = cluster_->master()->bound_rpc_addr().ToString();
client::sp::shared_ptr<KuduTable> table;
// At first, the range partition is unbounded, we can insert any data into it.
// We insert 100 rows with key range from 0 to 100, now there are 100 rows.
int num_rows = 100;
NO_FATALS(InsertTestRows(client_, kTableId, 0, num_rows));
ASSERT_OK(client_->OpenTable(kTableId, &table));
ASSERT_EQ(100, CountTableRows(table.get()));
// For the unbounded range partition table, add any range partition will
// conflict with it, so we need to drop unbounded range partition first and
// then add range partition for it. Since the table is unbounded, it will
// drop all rows when dropping range partition. After dropping there will
// be 0 rows left.
string stdout, stderr;
Status s = RunKuduTool({
"table",
"drop_range_partition",
master_addr,
kTableId,
"[]",
"[]",
}, &stdout, &stderr);
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
ASSERT_EVENTUALLY([&]() {
ASSERT_OK(client_->OpenTable(kTableId, &table));
ASSERT_EQ(0, CountTableRows(table.get()));
});
// Since the unbounded partition has been dropped, now we can add a new unbounded
// range partition for the table.
s = RunKuduTool({
"table",
"add_range_partition",
master_addr,
kTableId,
"[]",
"[]",
}, &stdout, &stderr);
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
// Insert 100 rows with key range from 0 to 100,
// now there are 100 rows again.
NO_FATALS(InsertTestRows(client_, kTableId, 0, num_rows));
ASSERT_OK(client_->OpenTable(kTableId, &table));
ASSERT_EQ(100, CountTableRows(table.get()));
}
TEST_F(AdminCliTest, TestAddAndDropRangePartition) {
FLAGS_num_tablet_servers = 1;
FLAGS_num_replicas = 1;
NO_FATALS(BuildAndStart());
// The kTableId's range partition is unbounded, so we need to create another table to
// add or drop range partition.
const string kTestTableName = "TestTable0";
const string& master_addr = cluster_->master()->bound_rpc_addr().ToString();
// Build the schema.
KuduSchema schema;
KuduSchemaBuilder builder;
builder.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull();
builder.AddColumn("int_val")->Type(KuduColumnSchema::INT32)->NotNull();
builder.AddColumn("string_val")->Type(KuduColumnSchema::STRING)->NotNull();
builder.SetPrimaryKey({ "key" });
ASSERT_OK(builder.Build(&schema));
// Set up partitioning and create the table.
unique_ptr<KuduPartialRow> lower_bound(schema.NewRow());
ASSERT_OK(lower_bound->SetInt32("key", 0));
unique_ptr<KuduPartialRow> upper_bound(schema.NewRow());
ASSERT_OK(upper_bound->SetInt32("key", 100));
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
ASSERT_OK(table_creator->table_name(kTestTableName)
.schema(&schema)
.set_range_partition_columns({ "key" })
.add_range_partition(lower_bound.release(), upper_bound.release())
.num_replicas(FLAGS_num_replicas)
.Create());
client::sp::shared_ptr<KuduTable> table;
// Lambda function to add range partition using kudu CLI.
const auto add_range_partition_using_CLI = [&] (const string& lower_bound_json,
const string& upper_bound_json,
const string& lower_bound_type,
const string& upper_bound_type) {
string error, out;
Status s = RunKuduTool({
"table",
"add_range_partition",
master_addr,
kTestTableName,
lower_bound_json,
upper_bound_json,
Substitute("-lower_bound_type=$0", lower_bound_type),
Substitute("-upper_bound_type=$0", upper_bound_type),
}, &out, &error);
return s;
};
// Lambda function to drop range partition using kudu CLI.
const auto drop_range_partition_using_CLI = [&] (const string& lower_bound_json,
const string& upper_bound_json,
const string& lower_bound_type,
const string& upper_bound_type) {
string error, out;
Status s = RunKuduTool({
"table",
"drop_range_partition",
master_addr,
kTestTableName,
lower_bound_json,
upper_bound_json,
Substitute("-lower_bound_type=$0", lower_bound_type),
Substitute("-upper_bound_type=$0", upper_bound_type),
}, &out, &error);
return s;
};
const auto check_bounds = [&] (const string& lower_bound,
const string& upper_bound,
const string& lower_bound_type,
const string& upper_bound_type,
int start_row_to_insert,
int num_rows_to_insert,
vector<int> out_of_range_rows_to_insert) {
string lower_bound_type_internal = lower_bound_type.empty() ? "INCLUSIVE_BOUND" :
lower_bound_type;
string upper_bound_type_internal = upper_bound_type.empty() ? "EXCLUSIVE_BOUND" :
upper_bound_type;
// Add range partition.
ASSERT_OK(add_range_partition_using_CLI(lower_bound, upper_bound,
lower_bound_type_internal,
upper_bound_type_internal));
// Insert num_rows_to_insert rows to table.
ASSERT_OK(InsertTestRows(client_, kTestTableName, start_row_to_insert,
num_rows_to_insert));
ASSERT_OK(client_->OpenTable(kTestTableName, &table));
ASSERT_EQ(num_rows_to_insert, CountTableRows(table.get()));
// Insert rows outside range partition,
// which will return error in lambda as we expect.
for (auto& value: out_of_range_rows_to_insert) {
EXPECT_TRUE(InsertTestRows(client_, kTestTableName, value, 1).IsNotFound());
}
// Drop range partition.
ASSERT_OK(drop_range_partition_using_CLI(lower_bound, upper_bound,
lower_bound_type_internal,
upper_bound_type_internal));
ASSERT_EVENTUALLY([&]() {
ASSERT_OK(client_->OpenTable(kTestTableName, &table));
// Verify no rows are left.
ASSERT_EQ(0, CountTableRows(table.get()));
});
};
{
// Test specifying the range bound type in lower-case.
// Drop the range partition added when create table, the range partition is
// [0,100). Insert 100 rows, data range is 0-99. Now there are 100 rows.
NO_FATALS(InsertTestRows(client_, kTestTableName, 0, 100));
ASSERT_OK(client_->OpenTable(kTestTableName, &table));
ASSERT_EQ(100, CountTableRows(table.get()));
// Drop range partition of [0,100) by command line, now there are 0 rows left.
ASSERT_OK(drop_range_partition_using_CLI("[0]", "[100]", "inclusive_bound",
"exclusive_bound"));
ASSERT_EVENTUALLY([&]() {
ASSERT_OK(client_->OpenTable(kTestTableName, &table));
ASSERT_EQ(0, CountTableRows(table.get()));
});
}
{
// Test adding (INCLUSIVE_BOUND, EXCLUSIVE_BOUND) range partition.
// Adding [100,200), 100 is inclusive and 200 is exclusive. Then insert
// 100 rows , the data range is [100-199]. Insert 99 and 200 to test
// insert out of range row. Last, dropping the range partition and checking
// that there are 0 rows left.
check_bounds("[100]", "[200]", "INCLUSIVE_BOUND", "EXCLUSIVE_BOUND", 100, 100,
{ 99, 200 });
}
{
// Test adding (INCLUSIVE_BOUND, INCLUSIVE_BOUND) range partition.
// Adding [100,200], both 100 and 200 are inclusive. Then insert 101
// rows, the data range is [100,200]. Insert 99 and 201 to test insert
// out of range row. Last, dropping the range partition and checking
// that there are 0 rows left.
check_bounds("[100]", "[200]", "INCLUSIVE_BOUND", "INCLUSIVE_BOUND", 100, 101,
{ 99, 201 });
}
{
// Test adding (EXCLUSIVE_BOUND, INCLUSIVE_BOUND) range partition.
// Adding (100,200], 100 is exclusive while 200 is inclusive.Then insert
// 100 rows, the data range is (100,200]. Insert 100 and 201 to test
// insert out of range row. Last, dropping the range partition and checking
// that there are 0 rows left.
check_bounds("[100]", "[200]", "EXCLUSIVE_BOUND", "INCLUSIVE_BOUND", 101, 100,
{ 100, 201 });
}
{
// Test adding (EXCLUSIVE_BOUND, EXCLUSIVE_BOUND) range partition.
// Adding (100,200), both 100 and 200 are exclusive.Then insert 99 rows,
// the data range is (100,200). Insert 100 and 200 to test insert out of
// range row. Last, dropping the range partition and checking that there
// are 0 rows left.
check_bounds("[100]", "[200]", "EXCLUSIVE_BOUND", "EXCLUSIVE_BOUND", 101, 99,
{ 100, 200 });
}
{
// Test adding (INCLUSIVE_BOUND, UNBOUNDED) range partition.
// Adding (1,unbouded), lower range bound is 1, upper range bound is unbounded,
// 1 is inclusive. Then insert 100 rows, the data range is [1-100]. Insert 0
// to test insert out of range row. Last, dropping the range partition and
// checking that there are 0 rows left.
check_bounds("[1]", "[]", "INCLUSIVE_BOUND", "", 1, 100,
{ 0 });
}
{
// Test adding (EXCLUSIVE_BOUND, UNBOUNDED) range partition.
// Adding (0,unbouded), lower range bound is 0, upper range bound is unbounded,
// 0 is exclusive. Then insert 100 rows, the data range
// is [2-101]. Insert 1 to test insert out of range row. Last, dropping the range
// partition and checking that there are 0 rows left.
check_bounds("[1]", "[]", "EXCLUSIVE_BOUND", "", 2, 100,
{ 1 });
}
{
// Test adding (UNBOUNDED, INCLUSIVE_BOUND) range partition.
// Adding (unbouded,100), lower range bound unbound, upper range bound is 100,
// 100 is inclusive. Then insert 100 rows, the data range
// is [1-100]. Insert 101 to test insert out of range row. Last, dropping the range
// partition and checking that there are 0 rows left.
check_bounds("[]", "[100]", "", "INCLUSIVE_BOUND", 1, 100,
{ 101 });
}
{
// Test adding (UNBOUNDED, EXCLUSIVE_BOUND) range partition.
// Adding (unbouded,100), lower range bound unbound, upper range bound is 100,
// 100 is exclusive. Then insert 100 rows, the data range
// is [0-99]. Insert 100 to test insert out of range row. Last, dropping the range
// partition and checking that there are 0 rows left.
check_bounds("[]", "[100]", "", "EXCLUSIVE_BOUND", 0, 100,
{ 100 });
}
}
TEST_F(AdminCliTest, TestAddAndDropRangePartitionWithWrongParameters) {
FLAGS_num_tablet_servers = 1;
FLAGS_num_replicas = 1;
NO_FATALS(BuildAndStart());
const string& master_addr = cluster_->master()->bound_rpc_addr().ToString();
const string kTestTableName = "TestTable1";
// Build the schema.
KuduSchema schema;
KuduSchemaBuilder builder;
builder.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull();
builder.SetPrimaryKey({ "key" });
ASSERT_OK(builder.Build(&schema));
unique_ptr<KuduPartialRow> lower_bound(schema.NewRow());
ASSERT_OK(lower_bound->SetInt32("key", 0));
unique_ptr<KuduPartialRow> upper_bound(schema.NewRow());
ASSERT_OK(upper_bound->SetInt32("key", 1));
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
ASSERT_OK(table_creator->table_name(kTestTableName)
.schema(&schema)
.set_range_partition_columns({ "key" })
.add_range_partition(lower_bound.release(), upper_bound.release())
.num_replicas(FLAGS_num_replicas)
.Create());
// Lambda function to check bad input, the function will return
// OK if running tool to add range partition return RuntimeError,
// which as we expect.
const auto check_bad_input = [&](const string& lower_bound_json,
const string& upper_bound_json,
const string& lower_bound_type,
const string& upper_bound_type,
const string& error) {
string out, err;
Status s = RunKuduTool({
"table",
"add_range_partition",
master_addr,
kTestTableName,
lower_bound_json,
upper_bound_json,
Substitute("-lower_bound_type=$0", lower_bound_type),
Substitute("-upper_bound_type=$0", upper_bound_type),
}, &out, &err);
ASSERT_TRUE(s.IsRuntimeError());
ASSERT_STR_CONTAINS(err, error);
};
// Test providing wrong type of range lower bound type, it will return error.
NO_FATALS(check_bad_input("[]", "[]", "test_lower_bound_type",
"EXCLUSIVE_BOUND",
"wrong type of range lower bound"));
// Test providing wrong type of range upper bound type, it will return error.
NO_FATALS(check_bad_input("[]", "[]", "INCLUSIVE_BOUND",
"test_upper_bound_type",
"wrong type of range upper bound"));
// Test providing wrong number of range values, it will return error.
NO_FATALS(check_bad_input("[1,2]", "[3]", "INCLUSIVE_BOUND",
"EXCLUSIVE_BOUND",
"wrong number of range columns specified: "
"expected 1 but received 2"));
// Test providing wrong type of range partition key: string instead of int,
// it will return error.
NO_FATALS(check_bad_input("[\"hello\"]", "[\"world\"]", "INCLUSIVE_BOUND",
"EXCLUSIVE_BOUND",
"unable to parse value"));
// Test providing incomplete json array of range bound, it will return error.
NO_FATALS(check_bad_input("[", "[2]", "INCLUSIVE_BOUND",
"EXCLUSIVE_BOUND",
"JSON text is corrupt"));
// Test providing wrong json array format of range bound, it will return error.
NO_FATALS(check_bad_input("[1,]", "[2]", "INCLUSIVE_BOUND",
"EXCLUSIVE_BOUND",
"JSON text is corrupt"));
// Test providing wrong JSON that's not an array, it will return error.
NO_FATALS(check_bad_input(
"{ \"key\" : 1}", "{\"key\" : 2 }", "INCLUSIVE_BOUND",
"EXCLUSIVE_BOUND",
"wrong type during field extraction: expected object array"));
}
TEST_F(AdminCliTest, TestAddAndDropRangePartitionForMultipleRangeColumnsTable) {
FLAGS_num_tablet_servers = 1;
FLAGS_num_replicas = 1;
NO_FATALS(BuildAndStart());
const string& master_addr = cluster_->master()->bound_rpc_addr().ToString();
const string kTestTableName = "TestTable2";
{
// Build the schema.
KuduSchema schema;
KuduSchemaBuilder builder;
builder.AddColumn("key_INT8")->Type(KuduColumnSchema::INT8)->NotNull();
builder.AddColumn("key_INT16")->Type(KuduColumnSchema::INT16)->NotNull();
builder.AddColumn("key_INT32")->Type(KuduColumnSchema::INT32)->NotNull();
builder.AddColumn("key_INT64")->Type(KuduColumnSchema::INT64)->NotNull();
builder.AddColumn("key_UNIXTIME_MICROS")->
Type(KuduColumnSchema::UNIXTIME_MICROS)->NotNull();
builder.AddColumn("key_BINARY")->Type(KuduColumnSchema::BINARY)->NotNull();
builder.AddColumn("key_STRING")->Type(KuduColumnSchema::STRING)->NotNull();
builder.SetPrimaryKey({ "key_INT8", "key_INT16", "key_INT32",
"key_INT64", "key_UNIXTIME_MICROS",
"key_BINARY", "key_STRING" });
ASSERT_OK(builder.Build(&schema));
// Init the range partition and create table.
unique_ptr<KuduPartialRow> lower_bound(schema.NewRow());
ASSERT_OK(lower_bound->SetInt8("key_INT8", 0));
ASSERT_OK(lower_bound->SetInt16("key_INT16", 1));
ASSERT_OK(lower_bound->SetInt32("key_INT32", 2));
ASSERT_OK(lower_bound->SetInt64("key_INT64", 3));
ASSERT_OK(lower_bound->SetUnixTimeMicros("key_UNIXTIME_MICROS", 4));
ASSERT_OK(lower_bound->SetBinaryCopy("key_BINARY", "a"));
ASSERT_OK(lower_bound->SetString("key_STRING", "b"));
unique_ptr<KuduPartialRow> upper_bound(schema.NewRow());
ASSERT_OK(upper_bound->SetInt8("key_INT8", 5));
ASSERT_OK(upper_bound->SetInt16("key_INT16", 6));
ASSERT_OK(upper_bound->SetInt32("key_INT32", 7));
ASSERT_OK(upper_bound->SetInt64("key_INT64", 8));
ASSERT_OK(upper_bound->SetUnixTimeMicros("key_UNIXTIME_MICROS", 9));
ASSERT_OK(upper_bound->SetBinaryCopy("key_BINARY", "c"));
ASSERT_OK(upper_bound->SetString("key_STRING", "d"));
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
ASSERT_OK(table_creator->table_name(kTestTableName)
.schema(&schema)
.set_range_partition_columns({ "key_INT8", "key_INT16", "key_INT32",
"key_INT64", "key_UNIXTIME_MICROS",
"key_BINARY", "key_STRING" })
.add_range_partition(lower_bound.release(), upper_bound.release())
.num_replicas(FLAGS_num_replicas)
.Create());
}
// Add range partition use CLI.
string stdout, stderr;
Status s = RunKuduTool({
"table",
"add_range_partition",
master_addr,
kTestTableName,
"[10, 11, 12, 13, 14, \"e\", \"f\"]",
"[15, 16, 17, 18, 19, \"g\", \"h\"]",
}, &stdout, &stderr);
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
client::sp::shared_ptr<KuduTable> table;
ASSERT_OK(client_->OpenTable(kTestTableName, &table));
{
// Insert test row.
auto session = client_->NewSession();
unique_ptr<KuduInsert> insert(table->NewInsert());
auto* row = insert->mutable_row();
ASSERT_OK(row->SetInt8("key_INT8", 10));
ASSERT_OK(row->SetInt16("key_INT16", 11));
ASSERT_OK(row->SetInt32("key_INT32", 12));
ASSERT_OK(row->SetInt64("key_INT64", 13));
ASSERT_OK(row->SetUnixTimeMicros("key_UNIXTIME_MICROS", 14));
ASSERT_OK(row->SetBinaryCopy("key_BINARY", "e"));
ASSERT_OK(row->SetString("key_STRING", "f"));
ASSERT_OK(session->Apply(insert.release()));
ASSERT_OK(session->Flush());
ASSERT_OK(session->Close());
}
// There is 1 row in table now.
ASSERT_OK(client_->OpenTable(kTestTableName, &table));
ASSERT_EQ(1, CountTableRows(table.get()));
// Drop range partition use CLI.
s = RunKuduTool({
"table",
"drop_range_partition",
master_addr,
kTestTableName,
"[10, 11, 12, 13, 14, \"e\", \"f\"]",
"[15, 16, 17, 18, 19, \"g\", \"h\"]",
}, &stdout, &stderr);
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
// There are 0 rows left.
ASSERT_EVENTUALLY([&]() {
ASSERT_OK(client_->OpenTable(kTestTableName, &table));
ASSERT_EQ(0, CountTableRows(table.get()));
});
}
TEST_F(AdminCliTest, AddAndDropRangeWithCustomHashSchema) {
FLAGS_num_tablet_servers = 1;
FLAGS_num_replicas = 1;
NO_FATALS(BuildAndStart());
const string& master_addr = cluster_->master()->bound_rpc_addr().ToString();
constexpr const char* const kTestTableName = "custom_hash_schemas";
constexpr const char* const kC0 = "c0";
constexpr const char* const kC1 = "c1";
constexpr const char* const kC2 = "c2";
{
KuduSchemaBuilder builder;
builder.AddColumn(kC0)->Type(KuduColumnSchema::INT8)->NotNull();
builder.AddColumn(kC1)->Type(KuduColumnSchema::INT16)->NotNull();
builder.AddColumn(kC2)->Type(KuduColumnSchema::STRING);
builder.SetPrimaryKey({ kC0, kC1 });
KuduSchema schema;
ASSERT_OK(builder.Build(&schema));
// Create a table with left-unbounded range partition having the
// table-wide hash schema.
unique_ptr<KuduPartialRow> l(schema.NewRow());
unique_ptr<KuduPartialRow> u(schema.NewRow());
ASSERT_OK(u->SetInt8(kC0, 0));
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
ASSERT_OK(table_creator->table_name(kTestTableName)
.schema(&schema)
.set_range_partition_columns({ kC0 })
.add_hash_partitions({ kC1 }, 2)
.add_range_partition(l.release(), u.release())
.num_replicas(FLAGS_num_replicas)
.Create());
}
string stdout;
string stderr;
// Add a range partition with custom hash schema using the kudu CLI tool.
{
constexpr const char* const kHashSchemaJson = R"*({
"hash_schema": [
{ "columns": ["c1"], "num_buckets": 5, "seed": 8 }
]
})*";
const auto s = RunKuduTool({
"table",
"add_range_partition",
master_addr,
kTestTableName,
"[0]",
"[1]",
Substitute("--hash_schema=$0", kHashSchemaJson),
}, &stdout, &stderr);
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
}
{
const auto s = RunKuduTool({
"table",
"describe",
master_addr,
kTestTableName,
}, &stdout, &stderr);
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
ASSERT_STR_CONTAINS(stdout,
"PARTITION 0 <= VALUES < 1 HASH(c1) PARTITIONS 5");
}
// Insert a row into the newly added range partition.
{
client::sp::shared_ptr<KuduTable> table;
ASSERT_OK(client_->OpenTable(kTestTableName, &table));
auto session = client_->NewSession();
unique_ptr<KuduInsert> insert(table->NewInsert());
auto* row = insert->mutable_row();
ASSERT_OK(row->SetInt8(kC0, 0));
ASSERT_OK(row->SetInt16(kC1, 0));
ASSERT_OK(row->SetString(kC2, "0"));
ASSERT_OK(session->Apply(insert.release()));
ASSERT_OK(session->Flush());
ASSERT_EQ(1, CountTableRows(table.get()));
}
// Add unbounded range using the kudu CLI tool.
{
constexpr const char* const kHashSchemaJson = R"*({
"hash_schema": [ { "columns": ["c0", "c1"], "num_buckets": 3 } ] })*";
const auto s = RunKuduTool({
"table",
"add_range_partition",
master_addr,
kTestTableName,
"[1]",
"[]",
Substitute("--hash_schema=$0", kHashSchemaJson),
}, &stdout, &stderr);
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
}
{
const auto s = RunKuduTool({
"table",
"describe",
master_addr,
kTestTableName,
}, &stdout, &stderr);
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
ASSERT_STR_CONTAINS(stdout,
"PARTITION 0 <= VALUES < 1 HASH(c1) PARTITIONS 5");
ASSERT_STR_CONTAINS(stdout,
"PARTITION 1 <= VALUES HASH(c0, c1) PARTITIONS 3");
}
// Insert a row into the newly added range partition.
{
client::sp::shared_ptr<KuduTable> table;
ASSERT_OK(client_->OpenTable(kTestTableName, &table));
auto session = client_->NewSession();
unique_ptr<KuduInsert> insert(table->NewInsert());
auto* row = insert->mutable_row();
ASSERT_OK(row->SetInt8(kC0, 10));
ASSERT_OK(row->SetInt16(kC1, 10));
ASSERT_OK(row->SetString(kC2, "10"));
ASSERT_OK(session->Apply(insert.release()));
ASSERT_OK(session->Flush());
ASSERT_EQ(2, CountTableRows(table.get()));
}
// Drop all the ranges one-by-one.
const vector<pair<string, string>> kRangesStr = {
{"", "0"}, {"0", "1"}, {"1", ""},
};
for (const std::pair<string, string>& r : kRangesStr) {
SCOPED_TRACE(Substitute("range ['$0', '$1')", r.first, r.second));
const auto s = RunKuduTool({
"table",
"drop_range_partition",
master_addr,
kTestTableName,
Substitute("[$0]", r.first),
Substitute("[$0]", r.second),
}, &stdout, &stderr);
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
}
// There should be 0 rows left.
client::sp::shared_ptr<KuduTable> table;
ASSERT_OK(client_->OpenTable(kTestTableName, &table));
ASSERT_EVENTUALLY([&]() {
ASSERT_EQ(0, CountTableRows(table.get()));
});
}
namespace {
constexpr const char* kPrincipal = "oryx";
vector<string> RebuildMasterCmd(const ExternalMiniCluster& cluster,
int tserver_num,
bool is_secure,
bool log_to_stderr = false,
const string& tables = "",
const int& default_replica_num = 1) {
CHECK_GT(tserver_num, 0);
CHECK_LE(tserver_num, cluster.num_tablet_servers());
vector<string> command = {
"master",
"unsafe_rebuild",
"-fs_data_dirs",
JoinStrings(cluster.master()->data_dirs(), ","),
"-fs_wal_dir",
cluster.master()->wal_dir(),
};
if (!tables.empty()) {
command.emplace_back(Substitute("-tables=$0", tables));
}
if (Env::Default()->IsEncryptionEnabled()) {
command.emplace_back("--encrypt_data_at_rest=true");
}
if (log_to_stderr) {
command.emplace_back("--logtostderr");
}
if (is_secure) {
command.emplace_back(Substitute("--sasl_protocol_name=$0", kPrincipal));
}
for (int i = 0; i < tserver_num; i++) {
auto* ts = cluster.tablet_server(i);
command.emplace_back(ts->bound_rpc_hostport().ToString());
}
command.emplace_back(Substitute("--default_num_replicas=$0", default_replica_num));
return command;
}
void MakeTestTable(const string& table_name,
int num_rows,
int num_replicas,
ExternalMiniCluster* cluster) {
TestWorkload workload(cluster);
workload.set_table_name(table_name);
workload.set_num_replicas(num_replicas);
workload.Setup();
workload.Start();
while (workload.rows_inserted() < num_rows) {
SleepFor(MonoDelta::FromMilliseconds(10));
}
workload.StopAndJoin();
}
} // anonymous namespace
// Test failing to run the CLI because the master is non-empty.
TEST_F(AdminCliTest, TestRebuildMasterWhenNonEmpty) {
FLAGS_num_tablet_servers = 3;
NO_FATALS(BuildAndStart({}, {}, {}, /*create_table*/false));
constexpr const char* kTable = "default.foo";
NO_FATALS(MakeTestTable(kTable, /*num_rows*/10, /*num_replicas*/1, cluster_.get()));
NO_FATALS(cluster_->master()->Shutdown());
string stdout;
string stderr;
Status s = RunKuduTool(RebuildMasterCmd(*cluster_, FLAGS_num_tablet_servers,
/*is_secure*/false, /*log_to_stderr*/true),
&stdout, &stderr);
ASSERT_TRUE(s.IsRuntimeError()) << s.ToString();
ASSERT_STR_CONTAINS(stderr, "must be empty");
// Delete the contents of the old master from disk. This should allow the
// tool to run.
ASSERT_OK(cluster_->master()->DeleteFromDisk());
ASSERT_OK(RunKuduTool(RebuildMasterCmd(*cluster_, FLAGS_num_tablet_servers,
/*is_secure*/false, /*log_to_stderr*/true),
&stdout, &stderr));
ASSERT_STR_NOT_CONTAINS(stderr, "must be empty");
ASSERT_STR_CONTAINS(stdout,
"Rebuilt from 3 tablet servers, of which 0 had errors");
ASSERT_STR_CONTAINS(stdout, "Rebuilt from 1 replicas, of which 0 had errors");
}
void delete_table_in_syscatalog(const string& wal_dir,
const std::vector<std::string>& data_dirs,
const string& del_table_name) {
MasterOptions opts;
opts.fs_opts.wal_root = wal_dir;
opts.fs_opts.data_roots = data_dirs;
Master master(opts);
ASSERT_OK(master.Init());
SysCatalogTable sys_catalog(&master, &NoOpCb);
ASSERT_OK(sys_catalog.Load(master.fs_manager()));
// Get table from sys_catalog.
const auto kLeaderTimeout = MonoDelta::FromSeconds(10);
ASSERT_OK(sys_catalog.tablet_replica()->consensus()->WaitUntilLeader(kLeaderTimeout));
TableInfoLoader table_info_loader;
sys_catalog.VisitTables(&table_info_loader);
scoped_refptr<TableInfo> table_info;
for (const auto& table : table_info_loader.tables) {
table->metadata().ReadLock();
string table_name = table->metadata().state().name();
table->metadata().ReadUnlock();
if (table_name == del_table_name) {
table_info = table;
break;
}
}
ASSERT_NE(nullptr, table_info);
// Get tablet from sys_catalog.
TabletInfoLoader tablet_info_loader;
sys_catalog.VisitTablets(&tablet_info_loader);
vector<scoped_refptr<TabletInfo>> tablets;
for (const auto& tablet : tablet_info_loader.tablets) {
tablet->metadata().ReadLock();
if (tablet->metadata().state().pb.table_id() == table_info->id())
tablets.push_back(tablet);
tablet->metadata().ReadUnlock();
}
ASSERT_GT(tablets.size(), 0);
// Delete one table and it's tablets.
TableMetadataLock l_table(table_info.get(), LockMode::WRITE);
TabletMetadataGroupLock l_tablets(LockMode::RELEASED);
l_tablets.AddMutableInfos(tablets);
l_tablets.Lock(LockMode::WRITE);
SysCatalogTable::Actions actions;
actions.table_to_delete = table_info;
actions.tablets_to_delete = tablets;
ASSERT_OK(sys_catalog.Write(actions));
NO_FATALS(sys_catalog.Shutdown());
NO_FATALS(master.Shutdown());
}
// Rebuild tables according to part of tables not all tables.
TEST_F(AdminCliTest, TestRebuildTables) {
FLAGS_num_tablet_servers = 3;
NO_FATALS(BuildAndStart({}, {}, {}, /*create_table*/false));
// Create 3 tables.
constexpr const char* kTable1 = "TestTable";
NO_FATALS(MakeTestTable(kTable1, /*num_rows*/10, /*num_replicas*/1, cluster_.get()));
constexpr const char* kTable2 = "TestTable1";
NO_FATALS(MakeTestTable(kTable2, /*num_rows*/10, /*num_replicas*/1, cluster_.get()));
constexpr const char* kTable3 = "TestTable2";
NO_FATALS(MakeTestTable(kTable3, /*num_rows*/10, /*num_replicas*/1, cluster_.get()));
const string& part_tables = Substitute("$0,$1", kTable1, kTable2);
NO_FATALS(cluster_->master()->Shutdown());
string stdout1;
string stderr1;
// Rebuild 2 tables in update mode.
ASSERT_OK(RunKuduTool(RebuildMasterCmd(*cluster_, FLAGS_num_tablet_servers,
/*is_secure*/false, /*log_to_stderr*/true,
part_tables, /*default_replica_num*/1),
&stdout1, &stderr1));
ASSERT_STR_CONTAINS(stdout1,
"Rebuilt from 3 tablet servers, of which 0 had errors");
ASSERT_STR_CONTAINS(stdout1, "Rebuilt from 2 replicas, of which 0 had errors");
for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
cluster_->tablet_server(i)->Shutdown();
}
// Restart the cluster to check cluster healthy.
cluster_->Restart();
WaitForTSAndReplicas();
ClusterVerifier cv1(cluster_.get());
NO_FATALS(cv1.CheckCluster());
NO_FATALS(cluster_->master()->Shutdown());
// Delete kTable1 in syscatalog.
delete_table_in_syscatalog(cluster_->master()->wal_dir(),
cluster_->master()->data_dirs(),
kTable1);
string stdout2;
string stderr2;
// Rebuild kTable1 in add mode.
ASSERT_OK(RunKuduTool(RebuildMasterCmd(*cluster_, FLAGS_num_tablet_servers,
/*is_secure*/false, /*log_to_stderr*/true, kTable1),
&stdout2, &stderr2));
ASSERT_STR_NOT_CONTAINS(stderr2, "must be empty");
ASSERT_STR_CONTAINS(stdout2,
"Rebuilt from 3 tablet servers, of which 0 had errors");
ASSERT_STR_CONTAINS(stdout2, "Rebuilt from 1 replicas, of which 0 had errors");
for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
cluster_->tablet_server(i)->Shutdown();
}
// Restart the cluster to check cluster healthy.
cluster_->Restart();
WaitForTSAndReplicas();
ClusterVerifier cv2(cluster_.get());
NO_FATALS(cv2.CheckCluster());
}
// Test that the master rebuilder ignores tombstones.
TEST_F(AdminCliTest, TestRebuildMasterWithTombstones) {
FLAGS_num_tablet_servers = 3;
NO_FATALS(BuildAndStart({}, {}, {}, /*create_table*/false));
constexpr const char* kTable = "default.foo";
const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
NO_FATALS(MakeTestTable(kTable, /*num_rows*/10, /*num_replicas*/1, cluster_.get()));
TabletServerMap ts_map;
ASSERT_OK(CreateTabletServerMap(cluster_->master_proxy(),
cluster_->messenger(),
&ts_map));
ValueDeleter deleter(&ts_map);
// Find the single tablet replica and tombstone it.
string tablet_id;
TServerDetails* ts_details = nullptr;
for (const auto& id_and_details : ts_map) {
vector<string> tablet_ids;
ts_details = id_and_details.second;
ASSERT_OK(ListRunningTabletIds(ts_details, kTimeout, &tablet_ids));
if (!tablet_ids.empty()) {
tablet_id = tablet_ids[0];
break;
}
}
ASSERT_FALSE(tablet_id.empty());
ASSERT_OK(itest::DeleteTablet(ts_details, tablet_id, tablet::TABLET_DATA_TOMBSTONED, kTimeout));
ASSERT_OK(itest::WaitUntilTabletInState(
ts_details, tablet_id, tablet::TabletStatePB::STOPPED, kTimeout));
// Rebuild the master. The tombstone shouldn't be rebuilt.
NO_FATALS(cluster_->master()->Shutdown());
ASSERT_OK(cluster_->master()->DeleteFromDisk());
string stdout;
string stderr;
ASSERT_OK(RunKuduTool(RebuildMasterCmd(*cluster_, FLAGS_num_tablet_servers,
/*is_secure*/false, /*log_to_stderr*/true),
&stdout, &stderr));
ASSERT_STR_CONTAINS(stderr, Substitute("Skipping replica of tablet $0 of table $1",
tablet_id, kTable));
ASSERT_STR_CONTAINS(stdout,
"Rebuilt from 3 tablet servers, of which 0 had errors");
ASSERT_STR_CONTAINS(stdout, "Rebuilt from 0 replicas, of which 0 had errors");
for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
cluster_->tablet_server(i)->Shutdown();
}
ASSERT_OK(cluster_->Restart());
// Clients shouldn't be able to access the table -- it wasn't rebuilt.
KuduClientBuilder builder;
ASSERT_OK(cluster_->CreateClient(&builder, &client_));
client::sp::shared_ptr<KuduTable> table;
Status s = client_->OpenTable(kTable, &table);
ASSERT_TRUE(s.IsNotFound()) << s.ToString();
// We should still be able to create a table of the same name though.
NO_FATALS(MakeTestTable(kTable, /*num_rows*/10, /*num_replicas*/1, cluster_.get()));
ASSERT_OK(client_->OpenTable(kTable, &table));
ClusterVerifier cv(cluster_.get());
NO_FATALS(cv.CheckCluster());
}
TEST_F(AdminCliTest, TestAddColumnsAndRebuildMaster) {
FLAGS_num_tablet_servers = 3;
FLAGS_num_replicas = 3;
NO_FATALS(BuildAndStart());
// Add a column and shutdown a tserver, the tserver holds a schema with a lower version.
{
unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableId));
table_alterer->AddColumn("old_column_0")->Type(KuduColumnSchema::INT32);
ASSERT_OK(table_alterer->Alter());
}
NO_FATALS(cluster_->tablet_server(0)->Shutdown());
// Add another column, the latest schema has a higher version.
{
unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableId));
table_alterer->AddColumn("old_column_1")->Type(KuduColumnSchema::INT32);
ASSERT_OK(table_alterer->Alter());
}
// Shut down the master and wipe out its data.
NO_FATALS(cluster_->master()->Shutdown());
ASSERT_OK(cluster_->master()->DeleteFromDisk());
// Restart the shutdown tserver, which holds a lower version schema.
ASSERT_OK(cluster_->tablet_server(0)->Restart());
// Rebuild the master with the tool.
// The tool will firstly use schema on tserver-0 which holds an outdated schema, then
// use the newer schema on tserver-1 to rebuild master.
string stdout;
ASSERT_OK(RunKuduTool(RebuildMasterCmd(*cluster_, 2,
/*is_secure*/false, /*log_to_stderr*/true, "", 3),
&stdout));
ASSERT_STR_CONTAINS(stdout, "Rebuilt from 2 tablet servers, of which 0 had errors");
ASSERT_STR_CONTAINS(stdout, "Rebuilt from 2 replicas, of which 0 had errors");
// Restart the master and the tablet servers.
// The tablet servers must be restarted so they accept the new master's certs.
for (int i = 0; i < FLAGS_num_tablet_servers; i++) {
cluster_->tablet_server(i)->Shutdown();
}
ASSERT_OK(cluster_->Restart());
WaitForTSAndReplicas();
// Wait the cluster to become healthy.
string master_address = cluster_->master()->bound_rpc_addr().ToString();
ASSERT_EVENTUALLY([&]() {
ASSERT_OK(RunKuduTool({"cluster", "ksck", master_address}, nullptr, nullptr));
});
// The client has to be rebuilt since there's a new master.
KuduClientBuilder builder;
ASSERT_OK(cluster_->CreateClient(&builder, &client_));
// Make sure we can add columns to the table we rebuilt.
{
unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableId));
table_alterer->AddColumn("new_column_0")->Type(KuduColumnSchema::INT32);
ASSERT_OK(table_alterer->Alter());
}
// Check master and all tservers have the latest schema.
ASSERT_OK(RunKuduTool({"table", "describe", master_address, kTableId}, &stdout));
ASSERT_STR_MATCHES(stdout, "old_column_0.*old_column_1.*new_column_0");
for (int i = 0; i < FLAGS_num_tablet_servers; i++) {
const string& ts_addr = cluster_->tablet_server(i)->bound_rpc_addr().ToString();
ASSERT_OK(RunKuduTool({"remote_replica", "list", ts_addr, "-include_schema"}, &stdout));
ASSERT_STR_MATCHES(stdout, "old_column_0.*old_column_1.*new_column_0");
}
// Check the altered table schema in client view, and check it is writable and readable.
{
KuduSchema schema;
ASSERT_OK(client_->GetTableSchema(kTableId, &schema));
ASSERT_EQ(6, schema.num_columns());
// Here we use the first column to initialize an object of KuduColumnSchema
// for there is no default constructor for it.
KuduColumnSchema col_schema = schema.Column(0);
ASSERT_TRUE(schema.HasColumn("old_column_0", &col_schema));
ASSERT_TRUE(schema.HasColumn("old_column_1", &col_schema));
ASSERT_TRUE(schema.HasColumn("new_column_0", &col_schema));
client::sp::shared_ptr<KuduTable> table;
ASSERT_OK(client_->OpenTable(kTableId, &table));
TestWorkload workload(cluster_.get());
workload.set_table_name(kTableId);
workload.set_num_write_threads(1);
workload.set_schema(table->schema());
workload.Setup();
workload.Start();
while (workload.rows_inserted() < 1000) {
SleepFor(MonoDelta::FromMilliseconds(10));
}
workload.StopAndJoin();
vector<string> rows;
ScanTableToStrings(table.get(), &rows);
for (const auto& row : rows) {
ASSERT_STR_MATCHES(row, "old_column_0.*old_column_1.*new_column_0");
}
}
}
class SecureClusterAdminCliTest : public KuduTest {
public:
void SetUpCluster(bool is_secure) {
ExternalMiniClusterOptions opts;
opts.num_tablet_servers = 3;
if (is_secure) {
opts.enable_kerberos = true;
opts.principal = "oryx";
}
cluster_.reset(new ExternalMiniCluster(std::move(opts)));
ASSERT_OK(cluster_->Start());
KuduClientBuilder builder;
builder.sasl_protocol_name(kPrincipal);
ASSERT_OK(cluster_->CreateClient(&builder, &client_));
}
void SetUp() override {
NO_FATALS(SetUpCluster(/*is_secure*/true));
}
protected:
unique_ptr<ExternalMiniCluster> cluster_;
client::sp::shared_ptr<KuduClient> client_;
};
TEST_F(SecureClusterAdminCliTest, TestNonDefaultPrincipal) {
ASSERT_OK(RunKuduTool({"master",
"list",
Substitute("--sasl_protocol_name=$0", kPrincipal),
HostPort::ToCommaSeparatedString(cluster_->master_rpc_addrs())}));
}
class SecureClusterAdminCliParamTest : public SecureClusterAdminCliTest,
public ::testing::WithParamInterface<bool> {
public:
void SetUp() override {
SetUpCluster(/*is_secure*/GetParam());
}
};
// Basic test that the master rebuilder works in the happy case.
TEST_P(SecureClusterAdminCliParamTest, TestRebuildMaster) {
bool is_secure = GetParam();
constexpr const char* kPreRebuildTableName = "pre_rebuild";
constexpr const char* kPostRebuildTableName = "post_rebuild";
constexpr int kNumRows = 10000;
FLAGS_num_tablet_servers = 3;
FLAGS_num_replicas = 3;
// Create a table and insert some rows
NO_FATALS(MakeTestTable(kPreRebuildTableName, kNumRows, /*num_replicas*/3, cluster_.get()));
// Scan the table to strings so we can check it isn't corrupted post-rebuild.
vector<string> rows;
{
client::sp::shared_ptr<KuduTable> table;
ASSERT_OK(client_->OpenTable(kPreRebuildTableName, &table));
ScanTableToStrings(table.get(), &rows);
}
// Shut down the master and wipe out its data.
NO_FATALS(cluster_->master()->Shutdown());
ASSERT_OK(cluster_->master()->DeleteFromDisk());
// Rebuild the master with the tool.
string stdout;
ASSERT_OK(RunKuduTool(RebuildMasterCmd(*cluster_, FLAGS_num_tablet_servers, is_secure),
&stdout));
ASSERT_STR_CONTAINS(stdout,
"Rebuilt from 3 tablet servers, of which 0 had errors");
ASSERT_STR_CONTAINS(stdout, "Rebuilt from 3 replicas, of which 0 had errors");
// Restart the master and the tablet servers.
// The tablet servers must be restarted so they accept the new master's certs.
for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
cluster_->tablet_server(i)->Shutdown();
}
ASSERT_OK(cluster_->Restart());
// Verify we can read back exactly what we read before the rebuild.
// The client has to be rebuilt since there's a new master.
KuduClientBuilder builder;
ASSERT_OK(cluster_->CreateClient(&builder, &client_));
vector<string> postrebuild_rows;
{
client::sp::shared_ptr<KuduTable> table;
ASSERT_OK(client_->OpenTable(kPreRebuildTableName, &table));
ScanTableToStrings(table.get(), &postrebuild_rows);
}
ASSERT_EQ(rows.size(), postrebuild_rows.size());
for (int i = 0; i < rows.size(); i++) {
ASSERT_EQ(rows[i], postrebuild_rows[i]) << "Mismatch at row " << i
<< " of " << rows.size();
}
// The cluster should still be considered healthy.
FLAGS_sasl_protocol_name = kPrincipal;
ClusterVerifier cv(cluster_.get());
NO_FATALS(cv.CheckCluster());
// Make sure we can delete the table we created before the rebuild.
ASSERT_OK(client_->DeleteTable(kPreRebuildTableName));
// Make sure we can create a table and write to it.
NO_FATALS(MakeTestTable(kPostRebuildTableName, kNumRows, /*num_replicas*/3, cluster_.get()));
NO_FATALS(cv.CheckCluster());
}
TEST_P(SecureClusterAdminCliParamTest, TestRebuildMasterAndAddColumns) {
bool is_secure = GetParam();
constexpr const char* kTableName = "rebuild_and_add_columns";
constexpr int kNumRows = 10000;
FLAGS_num_tablet_servers = 3;
FLAGS_num_replicas = 3;
// Create a table and insert some rows.
NO_FATALS(MakeTestTable(kTableName, kNumRows, /*num_replicas*/3, cluster_.get()));
// Shut down the master and wipe out its data.
NO_FATALS(cluster_->master()->Shutdown());
ASSERT_OK(cluster_->master()->DeleteFromDisk());
// Rebuild the master with the tool.
string stdout;
ASSERT_OK(RunKuduTool(RebuildMasterCmd(*cluster_, FLAGS_num_tablet_servers, is_secure),
&stdout));
ASSERT_STR_CONTAINS(stdout,
"Rebuilt from 3 tablet servers, of which 0 had errors");
ASSERT_STR_CONTAINS(stdout, "Rebuilt from 3 replicas, of which 0 had errors");
// Restart the master and the tablet servers.
// The tablet servers must be restarted so they accept the new master's certs.
for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
cluster_->tablet_server(i)->Shutdown();
}
ASSERT_OK(cluster_->Restart());
// The client has to be rebuilt since there's a new master.
KuduClientBuilder builder;
ASSERT_OK(cluster_->CreateClient(&builder, &client_));
// Make sure we can add columns to the table we rebuilt.
unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
for (int i = 0; i < 10; i++) {
table_alterer->AddColumn(Substitute("c$0", i))->Type(KuduColumnSchema::INT32);
}
ASSERT_OK(table_alterer->Alter());
// Check the altered table is readable.
{
vector<string> rows;
client::sp::shared_ptr<KuduTable> table;
ASSERT_OK(client_->OpenTable(kTableName, &table));
ScanTableToStrings(table.get(), &rows);
}
// The cluster should still be considered healthy.
FLAGS_sasl_protocol_name = kPrincipal;
ClusterVerifier cv(cluster_.get());
NO_FATALS(cv.CheckCluster());
}
INSTANTIATE_TEST_SUITE_P(IsSecure, SecureClusterAdminCliParamTest,
::testing::Bool());
} // namespace tools
} // namespace kudu