| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <sys/stat.h> |
| |
| #include <algorithm> |
| #include <cstdint> |
| #include <cstdio> |
| #include <cstdlib> |
| #include <iterator> |
| #include <map> |
| #include <memory> |
| #include <sstream> |
| #include <string> |
| #include <tuple> // IWYU pragma: keep |
| #include <type_traits> |
| #include <unordered_map> |
| #include <unordered_set> |
| #include <utility> |
| #include <vector> |
| |
| #include <boost/optional/optional.hpp> |
| #include <gflags/gflags_declare.h> |
| #include <glog/logging.h> |
| #include <glog/stl_logging.h> |
| #include <gmock/gmock-matchers.h> |
| #include <gtest/gtest.h> |
| |
| #include "kudu/cfile/cfile-test-base.h" |
| #include "kudu/cfile/cfile_util.h" |
| #include "kudu/cfile/cfile_writer.h" |
| #include "kudu/client/client-test-util.h" |
| #include "kudu/client/client.h" |
| #include "kudu/client/schema.h" |
| #include "kudu/client/shared_ptr.h" |
| #include "kudu/common/common.pb.h" |
| #include "kudu/common/partial_row.h" |
| #include "kudu/common/partition.h" |
| #include "kudu/common/schema.h" |
| #include "kudu/common/types.h" |
| #include "kudu/common/wire_protocol-test-util.h" |
| #include "kudu/common/wire_protocol.h" |
| #include "kudu/common/wire_protocol.pb.h" |
| #include "kudu/consensus/consensus.pb.h" |
| #include "kudu/consensus/log.h" |
| #include "kudu/consensus/log_util.h" |
| #include "kudu/consensus/opid.pb.h" |
| #include "kudu/consensus/opid_util.h" |
| #include "kudu/consensus/raft_consensus.h" |
| #include "kudu/consensus/ref_counted_replicate.h" |
| #include "kudu/fs/block_id.h" |
| #include "kudu/fs/block_manager.h" |
| #include "kudu/fs/fs_manager.h" |
| #include "kudu/fs/fs_report.h" |
| #include "kudu/gutil/map-util.h" |
| #include "kudu/gutil/port.h" |
| #include "kudu/gutil/ref_counted.h" |
| #include "kudu/gutil/stl_util.h" |
| #include "kudu/gutil/strings/escaping.h" |
| #include "kudu/gutil/strings/join.h" |
| #include "kudu/gutil/strings/numbers.h" |
| #include "kudu/gutil/strings/split.h" |
| #include "kudu/gutil/strings/strcat.h" |
| #include "kudu/gutil/strings/strip.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/gutil/strings/util.h" |
| #include "kudu/hms/hive_metastore_types.h" |
| #include "kudu/hms/hms_catalog.h" |
| #include "kudu/hms/hms_client.h" |
| #include "kudu/hms/mini_hms.h" |
| #include "kudu/integration-tests/cluster_itest_util.h" |
| #include "kudu/integration-tests/cluster_verifier.h" |
| #include "kudu/integration-tests/mini_cluster_fs_inspector.h" |
| #include "kudu/integration-tests/test_workload.h" |
| #include "kudu/mini-cluster/external_mini_cluster.h" |
| #include "kudu/mini-cluster/internal_mini_cluster.h" |
| #include "kudu/mini-cluster/mini_cluster.h" |
| #include "kudu/rpc/rpc_controller.h" |
| #include "kudu/tablet/local_tablet_writer.h" |
| #include "kudu/tablet/metadata.pb.h" |
| #include "kudu/tablet/tablet-harness.h" |
| #include "kudu/tablet/tablet.h" |
| #include "kudu/tablet/tablet.pb.h" |
| #include "kudu/tablet/tablet_metadata.h" |
| #include "kudu/tablet/tablet_replica.h" |
| #include "kudu/thrift/client.h" |
| #include "kudu/tools/tool.pb.h" |
| #include "kudu/tools/tool_action_common.h" |
| #include "kudu/tools/tool_replica_util.h" |
| #include "kudu/tools/tool_test_util.h" |
| #include "kudu/tserver/mini_tablet_server.h" |
| #include "kudu/tserver/tablet_server.h" |
| #include "kudu/tserver/tablet_server_options.h" |
| #include "kudu/tserver/ts_tablet_manager.h" |
| #include "kudu/tserver/tserver.pb.h" |
| #include "kudu/tserver/tserver_admin.pb.h" |
| #include "kudu/tserver/tserver_admin.proxy.h" |
| #include "kudu/util/async_util.h" |
| #include "kudu/util/env.h" |
| #include "kudu/util/int128_util.h" // IWYU pragma: keep |
| #include "kudu/util/metrics.h" |
| #include "kudu/util/monotime.h" |
| #include "kudu/util/net/net_util.h" |
| #include "kudu/util/net/sockaddr.h" |
| #include "kudu/util/oid_generator.h" |
| #include "kudu/util/path_util.h" |
| #include "kudu/util/pb_util.h" |
| #include "kudu/util/random.h" |
| #include "kudu/util/slice.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/subprocess.h" |
| #include "kudu/util/test_macros.h" |
| #include "kudu/util/test_util.h" |
| #include "kudu/util/url-coding.h" |
| |
| DECLARE_bool(hive_metastore_sasl_enabled); |
| DECLARE_string(block_manager); |
| DECLARE_string(hive_metastore_uris); |
| |
| METRIC_DECLARE_counter(bloom_lookups); |
| METRIC_DECLARE_entity(tablet); |
| |
| using boost::optional; |
| using kudu::cfile::CFileWriter; |
| using kudu::cfile::StringDataGenerator; |
| using kudu::cfile::WriterOptions; |
| using kudu::client::KuduClient; |
| using kudu::client::KuduClientBuilder; |
| using kudu::client::KuduScanToken; |
| using kudu::client::KuduScanTokenBuilder; |
| using kudu::client::KuduSchema; |
| using kudu::client::KuduSchemaBuilder; |
| using kudu::client::KuduTable; |
| using kudu::client::sp::shared_ptr; |
| using kudu::cluster::ExternalMiniCluster; |
| using kudu::cluster::ExternalMiniClusterOptions; |
| using kudu::cluster::ExternalTabletServer; |
| using kudu::cluster::InternalMiniCluster; |
| using kudu::cluster::InternalMiniClusterOptions; |
| using kudu::consensus::OpId; |
| using kudu::consensus::RECEIVED_OPID; |
| using kudu::consensus::ReplicateMsg; |
| using kudu::consensus::ReplicateRefPtr; |
| using kudu::fs::BlockDeletionTransaction; |
| using kudu::fs::FsReport; |
| using kudu::fs::WritableBlock; |
| using kudu::hms::HmsCatalog; |
| using kudu::hms::HmsClient; |
| using kudu::itest::MiniClusterFsInspector; |
| using kudu::itest::TServerDetails; |
| using kudu::log::Log; |
| using kudu::log::LogOptions; |
| using kudu::rpc::RpcController; |
| using kudu::tablet::LocalTabletWriter; |
| using kudu::tablet::Tablet; |
| using kudu::tablet::TabletDataState; |
| using kudu::tablet::TabletHarness; |
| using kudu::tablet::TabletMetadata; |
| using kudu::tablet::TabletReplica; |
| using kudu::tablet::TabletSuperBlockPB; |
| using kudu::tserver::DeleteTabletRequestPB; |
| using kudu::tserver::DeleteTabletResponsePB; |
| using kudu::tserver::ListTabletsResponsePB; |
| using kudu::tserver::MiniTabletServer; |
| using kudu::tserver::WriteRequestPB; |
| using std::back_inserter; |
| using std::copy; |
| using std::make_pair; |
| using std::map; |
| using std::ostringstream; |
| using std::pair; |
| using std::string; |
| using std::unique_ptr; |
| using std::unordered_map; |
| using std::unordered_set; |
| using std::vector; |
| using strings::Substitute; |
| |
| namespace kudu { |
| namespace tools { |
| |
| class ToolTest : public KuduTest { |
| public: |
| ~ToolTest() { |
| STLDeleteValues(&ts_map_); |
| } |
| |
| virtual void TearDown() OVERRIDE { |
| if (cluster_) cluster_->Shutdown(); |
| if (mini_cluster_) mini_cluster_->Shutdown(); |
| KuduTest::TearDown(); |
| } |
| |
| virtual bool EnableKerberos() { |
| return false; |
| } |
| |
| Status RunTool(const string& arg_str, |
| string* stdout = nullptr, |
| string* stderr = nullptr, |
| vector<string>* stdout_lines = nullptr, |
| vector<string>* stderr_lines = nullptr, |
| const string& in = "") const { |
| string out; |
| string err; |
| Status s = RunKuduTool(strings::Split(arg_str, " ", strings::SkipEmpty()), |
| &out, &err, in); |
| if (stdout) { |
| *stdout = out; |
| StripTrailingNewline(stdout); |
| } |
| if (stderr) { |
| *stderr = err; |
| StripTrailingNewline(stderr); |
| } |
| if (stdout_lines) { |
| *stdout_lines = strings::Split(out, "\n"); |
| while (!stdout_lines->empty() && stdout_lines->back() == "") { |
| stdout_lines->pop_back(); |
| } |
| } |
| if (stderr_lines) { |
| *stderr_lines = strings::Split(err, "\n"); |
| while (!stderr_lines->empty() && stderr_lines->back() == "") { |
| stderr_lines->pop_back(); |
| } |
| } |
| return s; |
| } |
| |
| void RunActionStdoutNone(const string& arg_str) const { |
| string stdout; |
| string stderr; |
| Status s = RunTool(arg_str, &stdout, &stderr, nullptr, nullptr); |
| SCOPED_TRACE(stdout); |
| SCOPED_TRACE(stderr); |
| ASSERT_OK(s); |
| ASSERT_TRUE(stdout.empty()); |
| } |
| |
| void RunActionStdoutString(const string& arg_str, string* stdout) const { |
| string stderr; |
| Status s = RunTool(arg_str, stdout, &stderr, nullptr, nullptr); |
| SCOPED_TRACE(*stdout); |
| SCOPED_TRACE(stderr); |
| ASSERT_OK(s); |
| } |
| |
| void RunActionStdinStdoutString(const string& arg_str, const string& stdin, |
| string* stdout) const { |
| string stderr; |
| Status s = RunTool(arg_str, stdout, &stderr, nullptr, nullptr, stdin); |
| SCOPED_TRACE(*stdout); |
| SCOPED_TRACE(stderr); |
| ASSERT_OK(s); |
| } |
| |
| Status RunActionStderrString(const string& arg_str, string* stderr) const { |
| return RunTool(arg_str, nullptr, stderr, nullptr, nullptr); |
| } |
| |
| Status RunActionStdoutStderrString(const string& arg_str, string* stdout, |
| string* stderr) const { |
| return RunTool(arg_str, stdout, stderr, nullptr, nullptr); |
| } |
| |
| void RunActionStdoutLines(const string& arg_str, vector<string>* stdout_lines) const { |
| string stderr; |
| Status s = RunTool(arg_str, nullptr, &stderr, stdout_lines, nullptr); |
| SCOPED_TRACE(*stdout_lines); |
| SCOPED_TRACE(stderr); |
| ASSERT_OK(s); |
| } |
| |
| // Run tool with specified arguments, expecting help output. |
| void RunTestHelp(const string& arg_str, |
| const vector<string>& regexes, |
| const Status& expected_status = Status::OK()) const { |
| vector<string> stdout; |
| vector<string> stderr; |
| Status s = RunTool(arg_str, nullptr, nullptr, &stdout, &stderr); |
| SCOPED_TRACE(stdout); |
| SCOPED_TRACE(stderr); |
| |
| // These are always true for showing help. |
| ASSERT_TRUE(s.IsRuntimeError()); |
| ASSERT_TRUE(stdout.empty()); |
| ASSERT_FALSE(stderr.empty()); |
| |
| // If it was an invalid command, the usage string is on the third line. |
| int usage_idx = 1; |
| if (!expected_status.ok()) { |
| ASSERT_EQ(expected_status.ToString(), stderr[1]); |
| usage_idx = 2; |
| } |
| ASSERT_EQ(0, stderr[usage_idx].find("Usage: ")); |
| |
| // Strip away everything up to the usage string to test for regexes. |
| const vector<string> remaining_lines(stderr.begin() + usage_idx + 1, |
| stderr.end()); |
| for (const auto& r : regexes) { |
| ASSERT_STRINGS_ANY_MATCH(remaining_lines, r); |
| } |
| } |
| |
| // Run tool without a required positional argument, expecting error message |
| void RunActionMissingRequiredArg(const string& arg_str, const string& required_arg, |
| bool variadic = false) const { |
| const string kPositionalArgumentMessage = "must provide positional argument"; |
| const string kVariadicArgumentMessage = "must provide variadic positional argument"; |
| const string& message = variadic ? kVariadicArgumentMessage : kPositionalArgumentMessage; |
| Status expected_status = Status::InvalidArgument(Substitute("$0 $1", message, required_arg)); |
| |
| vector<string> err_lines; |
| RunTool(arg_str, nullptr, nullptr, nullptr, /* stderr_lines = */ &err_lines); |
| ASSERT_GE(err_lines.size(), 3) << err_lines; |
| ASSERT_EQ(expected_status.ToString(), err_lines[1]); |
| ASSERT_STR_MATCHES(err_lines[3], "Usage: kudu.*"); |
| } |
| |
| void RunFsCheck(const string& arg_str, |
| int expected_num_live, |
| const string& tablet_id, |
| const vector<BlockId>& expected_missing_blocks, |
| int expected_num_orphaned) { |
| string stdout; |
| string stderr; |
| Status s = RunTool(arg_str, &stdout, &stderr, nullptr, nullptr); |
| SCOPED_TRACE(stdout); |
| SCOPED_TRACE(stderr); |
| if (!expected_missing_blocks.empty()) { |
| ASSERT_TRUE(s.IsRuntimeError()); |
| ASSERT_STR_CONTAINS(stderr, "Corruption"); |
| } else { |
| ASSERT_TRUE(s.ok()); |
| } |
| // Some stats aren't gathered for the FBM: see FileBlockManager::Open. |
| ASSERT_STR_CONTAINS( |
| stdout, Substitute("Total live blocks: $0", |
| FLAGS_block_manager == "file" ? 0 : expected_num_live)); |
| ASSERT_STR_CONTAINS( |
| stdout, Substitute("Total missing blocks: $0", expected_missing_blocks.size())); |
| if (!expected_missing_blocks.empty()) { |
| ASSERT_STR_CONTAINS( |
| stdout, Substitute("Fatal error: tablet $0 missing blocks: ", tablet_id)); |
| for (const auto& b : expected_missing_blocks) { |
| ASSERT_STR_CONTAINS(stdout, b.ToString()); |
| } |
| } |
| ASSERT_STR_CONTAINS( |
| stdout, Substitute("Total orphaned blocks: $0", expected_num_orphaned)); |
| } |
| |
| Status HasAtLeastOneBackupFile(const string& dir, bool* found) { |
| vector<string> children; |
| RETURN_NOT_OK(env_->GetChildren(dir, &children)); |
| *found = false; |
| for (const auto& child : children) { |
| if (child.find(".bak") != string::npos) { |
| *found = true; |
| break; |
| } |
| } |
| return Status::OK(); |
| } |
| |
| protected: |
| void RunLoadgen(int num_tservers = 1, |
| const vector<string>& tool_args = {}, |
| const string& table_name = ""); |
| void StartExternalMiniCluster(ExternalMiniClusterOptions opts = {}); |
| void StartMiniCluster(InternalMiniClusterOptions opts = {}); |
| unique_ptr<ExternalMiniCluster> cluster_; |
| unique_ptr<MiniClusterFsInspector> inspect_; |
| unordered_map<string, TServerDetails*> ts_map_; |
| unique_ptr<InternalMiniCluster> mini_cluster_; |
| }; |
| |
| // Subclass of ToolTest that allows running individual test cases with Kerberos |
| // enabled and disabled. Most of the test cases are run only with Kerberos |
| // disabled, but to get coverage against a Kerberized cluster we run select |
| // cases in both modes. |
| class ToolTestKerberosParameterized : public ToolTest, public ::testing::WithParamInterface<bool> { |
| public: |
| bool EnableKerberos() override { |
| return GetParam(); |
| } |
| }; |
| INSTANTIATE_TEST_CASE_P(ToolTestKerberosParameterized, ToolTestKerberosParameterized, |
| ::testing::Values(false, true)); |
| |
| void ToolTest::StartExternalMiniCluster(ExternalMiniClusterOptions opts) { |
| cluster_.reset(new ExternalMiniCluster(std::move(opts))); |
| ASSERT_OK(cluster_->Start()); |
| inspect_.reset(new MiniClusterFsInspector(cluster_.get())); |
| ASSERT_OK(CreateTabletServerMap(cluster_->master_proxy(), |
| cluster_->messenger(), &ts_map_)); |
| } |
| |
| void ToolTest::StartMiniCluster(InternalMiniClusterOptions opts) { |
| mini_cluster_.reset(new InternalMiniCluster(env_, std::move(opts))); |
| ASSERT_OK(mini_cluster_->Start()); |
| } |
| |
| TEST_F(ToolTest, TestHelpXML) { |
| string stdout; |
| string stderr; |
| Status s = RunTool("--helpxml", &stdout, &stderr, nullptr, nullptr); |
| |
| ASSERT_TRUE(s.IsRuntimeError()); |
| ASSERT_FALSE(stdout.empty()); |
| ASSERT_TRUE(stderr.empty()); |
| |
| // All wrapped in AllModes node |
| ASSERT_STR_MATCHES(stdout, "<\\?xml version=\"1.0\"\\?><AllModes>.*</AllModes>"); |
| |
| // Verify all modes are output |
| const vector<string> modes = { |
| "cluster", |
| "diagnose", |
| "fs", |
| "local_replica", |
| "master", |
| "pbc", |
| "perf", |
| "remote_replica", |
| "table", |
| "tablet", |
| "test", |
| "tserver", |
| "wal", |
| "dump", |
| "cmeta", |
| "change_config" |
| }; |
| |
| for (const auto& mode : modes) { |
| ASSERT_STR_MATCHES(stdout, Substitute(".*<mode><name>$0</name>.*</mode>.*", mode)); |
| } |
| } |
| |
| TEST_F(ToolTest, TestTopLevelHelp) { |
| const vector<string> kTopLevelRegexes = { |
| "cluster.*Kudu cluster", |
| "diagnose.*Diagnostic tools.*", |
| "fs.*Kudu filesystem", |
| "local_replica.*tablet replicas", |
| "master.*Kudu Master", |
| "pbc.*protobuf container", |
| "perf.*performance of a Kudu cluster", |
| "remote_replica.*tablet replicas on a Kudu Tablet Server", |
| "table.*Kudu tables", |
| "tablet.*Kudu tablets", |
| "test.*test actions", |
| "tserver.*Kudu Tablet Server", |
| "wal.*write-ahead log" |
| }; |
| NO_FATALS(RunTestHelp("", kTopLevelRegexes)); |
| NO_FATALS(RunTestHelp("--help", kTopLevelRegexes)); |
| NO_FATALS(RunTestHelp("not_a_mode", kTopLevelRegexes, |
| Status::InvalidArgument("unknown command 'not_a_mode'"))); |
| } |
| |
| TEST_F(ToolTest, TestModeHelp) { |
| { |
| const vector<string> kFsModeRegexes = { |
| "check.*Kudu filesystem for inconsistencies", |
| "dump.*Dump a Kudu filesystem", |
| "format.*new Kudu filesystem", |
| "list.*List metadata for on-disk tablets, rowsets, blocks", |
| "update_dirs.*Updates the set of data directories", |
| }; |
| NO_FATALS(RunTestHelp("fs", kFsModeRegexes)); |
| NO_FATALS(RunTestHelp("fs not_a_mode", kFsModeRegexes, |
| Status::InvalidArgument("unknown command 'not_a_mode'"))); |
| } |
| { |
| const vector<string> kFsDumpModeRegexes = { |
| "block.*binary contents of a data block", |
| "cfile.*contents of a CFile", |
| "tree.*tree of a Kudu filesystem", |
| "uuid.*UUID of a Kudu filesystem" |
| }; |
| NO_FATALS(RunTestHelp("fs dump", kFsDumpModeRegexes)); |
| |
| } |
| { |
| const vector<string> kLocalReplicaModeRegexes = { |
| "cmeta.*Operate on a local tablet replica's consensus", |
| "data_size.*Summarize the data size", |
| "dump.*Dump a Kudu filesystem", |
| "copy_from_remote.*Copy a tablet replica", |
| "delete.*Delete a tablet replica from the local filesystem", |
| "list.*Show list of tablet replicas" |
| }; |
| NO_FATALS(RunTestHelp("local_replica", kLocalReplicaModeRegexes)); |
| } |
| { |
| const vector<string> kLocalReplicaDumpModeRegexes = { |
| "block_ids.*Dump the IDs of all blocks", |
| "meta.*Dump the metadata", |
| "rowset.*Dump the rowset contents", |
| "wals.*Dump all WAL" |
| }; |
| NO_FATALS(RunTestHelp("local_replica dump", kLocalReplicaDumpModeRegexes)); |
| } |
| { |
| const vector<string> kLocalReplicaCMetaRegexes = { |
| "print_replica_uuids.*Print all tablet replica peer UUIDs", |
| "rewrite_raft_config.*Rewrite a tablet replica", |
| "set_term.*Bump the current term" |
| }; |
| NO_FATALS(RunTestHelp("local_replica cmeta", kLocalReplicaCMetaRegexes)); |
| // Try with a hyphen instead of an underscore. |
| NO_FATALS(RunTestHelp("local-replica cmeta", kLocalReplicaCMetaRegexes)); |
| } |
| { |
| const vector<string> kLocalReplicaCopyFromRemoteRegexes = { |
| "Copy a tablet replica from a remote server" |
| }; |
| NO_FATALS(RunTestHelp("local_replica copy_from_remote --help", |
| kLocalReplicaCopyFromRemoteRegexes)); |
| // Try with hyphens instead of underscores. |
| NO_FATALS(RunTestHelp("local-replica copy-from-remote --help", |
| kLocalReplicaCopyFromRemoteRegexes)); |
| } |
| { |
| const vector<string> kClusterModeRegexes = { |
| "ksck.*Check the health of a Kudu cluster", |
| }; |
| NO_FATALS(RunTestHelp("cluster", kClusterModeRegexes)); |
| } |
| { |
| const vector<string> kDiagnoseModeRegexes = { |
| "parse_stacks.*Parse sampled stack traces", |
| }; |
| NO_FATALS(RunTestHelp("diagnose", kDiagnoseModeRegexes)); |
| } |
| { |
| const vector<string> kMasterModeRegexes = { |
| "get_flags.*Get the gflags", |
| "set_flag.*Change a gflag value", |
| "status.*Get the status", |
| "timestamp.*Get the current timestamp" |
| }; |
| NO_FATALS(RunTestHelp("master", kMasterModeRegexes)); |
| } |
| { |
| const vector<string> kPbcModeRegexes = { |
| "dump.*Dump a PBC", |
| }; |
| NO_FATALS(RunTestHelp("pbc", kPbcModeRegexes)); |
| } |
| { |
| const vector<string> kPerfRegexes = { |
| "loadgen.*Run load generation with optional scan afterwards", |
| }; |
| NO_FATALS(RunTestHelp("perf", kPerfRegexes)); |
| } |
| { |
| const vector<string> kRemoteReplicaModeRegexes = { |
| "check.*Check if all tablet replicas", |
| "copy.*Copy a tablet replica from one Kudu Tablet Server", |
| "delete.*Delete a tablet replica", |
| "dump.*Dump the data of a tablet replica", |
| "list.*List all tablet replicas", |
| "unsafe_change_config.*Force the specified replica to adopt" |
| }; |
| NO_FATALS(RunTestHelp("remote_replica", kRemoteReplicaModeRegexes)); |
| } |
| { |
| const vector<string> kTableModeRegexes = { |
| "delete.*Delete a table", |
| "rename_table.*Rename a table", |
| "rename_column.*Rename a column", |
| "list.*List tables", |
| }; |
| NO_FATALS(RunTestHelp("table", kTableModeRegexes)); |
| } |
| { |
| const vector<string> kTabletModeRegexes = { |
| "change_config.*Change.*Raft configuration", |
| "leader_step_down.*Force the tablet's leader replica to step down" |
| }; |
| NO_FATALS(RunTestHelp("tablet", kTabletModeRegexes)); |
| } |
| { |
| const vector<string> kTestModeRegexes = { |
| "mini_cluster.*Spawn a control shell" |
| }; |
| NO_FATALS(RunTestHelp("test", kTestModeRegexes)); |
| } |
| { |
| const vector<string> kChangeConfigModeRegexes = { |
| "add_replica.*Add a new replica", |
| "change_replica_type.*Change the type of an existing replica", |
| "move_replica.*Move a tablet replica", |
| "remove_replica.*Remove an existing replica" |
| }; |
| NO_FATALS(RunTestHelp("tablet change_config", kChangeConfigModeRegexes)); |
| } |
| { |
| const vector<string> kTServerModeRegexes = { |
| "get_flags.*Get the gflags", |
| "set_flag.*Change a gflag value", |
| "status.*Get the status", |
| "timestamp.*Get the current timestamp", |
| "list.*List tablet servers" |
| }; |
| NO_FATALS(RunTestHelp("tserver", kTServerModeRegexes)); |
| } |
| { |
| const vector<string> kWalModeRegexes = { |
| "dump.*Dump a WAL", |
| }; |
| NO_FATALS(RunTestHelp("wal", kWalModeRegexes)); |
| } |
| } |
| |
| TEST_F(ToolTest, TestActionHelp) { |
| const vector<string> kFormatActionRegexes = { |
| "-fs_wal_dir \\(Directory", |
| "-fs_data_dirs \\(Comma-separated list", |
| "-uuid \\(The uuid" |
| }; |
| NO_FATALS(RunTestHelp("fs format --help", kFormatActionRegexes)); |
| NO_FATALS(RunTestHelp("fs format extra_arg", kFormatActionRegexes, |
| Status::InvalidArgument("too many arguments: 'extra_arg'"))); |
| } |
| |
| TEST_F(ToolTest, TestActionMissingRequiredArg) { |
| NO_FATALS(RunActionMissingRequiredArg("master list", "master_addresses")); |
| NO_FATALS(RunActionMissingRequiredArg("cluster ksck --master_addresses=master.example.com", |
| "master_addresses")); |
| NO_FATALS(RunActionMissingRequiredArg("local_replica cmeta rewrite_raft_config fake_id", |
| "peers", /* variadic */ true)); |
| } |
| |
| TEST_F(ToolTest, TestFsCheck) { |
| const string kTestDir = GetTestPath("test"); |
| const string kTabletId = "ffffffffffffffffffffffffffffffff"; |
| const Schema kSchema(GetSimpleTestSchema()); |
| const Schema kSchemaWithIds(SchemaBuilder(kSchema).Build()); |
| |
| // Create a local replica, flush some rows a few times, and collect all |
| // of the created block IDs. |
| vector<BlockId> block_ids; |
| { |
| TabletHarness::Options opts(kTestDir); |
| opts.tablet_id = kTabletId; |
| TabletHarness harness(kSchemaWithIds, opts); |
| ASSERT_OK(harness.Create(true)); |
| ASSERT_OK(harness.Open()); |
| LocalTabletWriter writer(harness.tablet().get(), &kSchema); |
| KuduPartialRow row(&kSchemaWithIds); |
| |
| for (int num_flushes = 0; num_flushes < 10; num_flushes++) { |
| for (int i = 0; i < 10; i++) { |
| ASSERT_OK(row.SetInt32(0, num_flushes * i)); |
| ASSERT_OK(row.SetInt32(1, num_flushes * i * 10)); |
| ASSERT_OK(row.SetStringCopy(2, "HelloWorld")); |
| writer.Insert(row); |
| } |
| harness.tablet()->Flush(); |
| } |
| block_ids = harness.tablet()->metadata()->CollectBlockIds(); |
| harness.tablet()->Shutdown(); |
| } |
| |
| // Check the filesystem; all the blocks should be accounted for, and there |
| // should be no blocks missing or orphaned. |
| NO_FATALS(RunFsCheck(Substitute("fs check --fs_wal_dir=$0", kTestDir), |
| block_ids.size(), kTabletId, {}, 0)); |
| |
| // Delete half of the blocks. Upon the next check we can only find half, and |
| // the other half are deemed missing. |
| vector<BlockId> missing_ids; |
| { |
| FsManager fs(env_, kTestDir); |
| FsReport report; |
| ASSERT_OK(fs.Open(&report)); |
| std::shared_ptr<BlockDeletionTransaction> deletion_transaction = |
| fs.block_manager()->NewDeletionTransaction(); |
| for (int i = 0; i < block_ids.size(); i += 2) { |
| deletion_transaction->AddDeletedBlock(block_ids[i]); |
| } |
| deletion_transaction->CommitDeletedBlocks(&missing_ids); |
| } |
| NO_FATALS(RunFsCheck(Substitute("fs check --fs_wal_dir=$0", kTestDir), |
| block_ids.size() / 2, kTabletId, missing_ids, 0)); |
| |
| // Delete the tablet superblock. The next check finds half of the blocks, |
| // though without the superblock they're all considered to be orphaned. |
| // |
| // Here we check twice to show that if --repair isn't provided, there should |
| // be no effect. |
| { |
| FsManager fs(env_, kTestDir); |
| FsReport report; |
| ASSERT_OK(fs.Open(&report)); |
| ASSERT_OK(env_->DeleteFile(fs.GetTabletMetadataPath(kTabletId))); |
| } |
| for (int i = 0; i < 2; i++) { |
| NO_FATALS(RunFsCheck(Substitute("fs check --fs_wal_dir=$0", kTestDir), |
| block_ids.size() / 2, kTabletId, {}, block_ids.size() / 2)); |
| } |
| |
| // Repair the filesystem. The remaining half of all blocks were found, deemed |
| // to be orphaned, and deleted. The next check shows no remaining blocks. |
| NO_FATALS(RunFsCheck(Substitute("fs check --fs_wal_dir=$0 --repair", kTestDir), |
| block_ids.size() / 2, kTabletId, {}, block_ids.size() / 2)); |
| NO_FATALS(RunFsCheck(Substitute("fs check --fs_wal_dir=$0", kTestDir), |
| 0, kTabletId, {}, 0)); |
| } |
| |
| TEST_F(ToolTest, TestFsCheckLiveServer) { |
| NO_FATALS(StartExternalMiniCluster()); |
| string args = Substitute("fs check --fs_wal_dir $0 --fs_data_dirs $1", |
| cluster_->GetWalPath("master-0"), |
| JoinStrings(cluster_->GetDataPaths("master-0"), ",")); |
| NO_FATALS(RunFsCheck(args, 0, "", {}, 0)); |
| args += " --repair"; |
| string stdout; |
| string stderr; |
| Status s = RunTool(args, &stdout, &stderr, nullptr, nullptr); |
| SCOPED_TRACE(stdout); |
| SCOPED_TRACE(stderr); |
| ASSERT_TRUE(s.IsRuntimeError()); |
| ASSERT_TRUE(stdout.empty()); |
| ASSERT_STR_CONTAINS(stderr, "Could not lock"); |
| } |
| |
| TEST_F(ToolTest, TestFsFormat) { |
| const string kTestDir = GetTestPath("test"); |
| NO_FATALS(RunActionStdoutNone(Substitute("fs format --fs_wal_dir=$0", kTestDir))); |
| FsManager fs(env_, kTestDir); |
| ASSERT_OK(fs.Open()); |
| |
| ObjectIdGenerator generator; |
| string canonicalized_uuid; |
| ASSERT_OK(generator.Canonicalize(fs.uuid(), &canonicalized_uuid)); |
| ASSERT_EQ(fs.uuid(), canonicalized_uuid); |
| } |
| |
| TEST_F(ToolTest, TestFsFormatWithUuid) { |
| const string kTestDir = GetTestPath("test"); |
| ObjectIdGenerator generator; |
| string original_uuid = generator.Next(); |
| NO_FATALS(RunActionStdoutNone(Substitute( |
| "fs format --fs_wal_dir=$0 --uuid=$1", kTestDir, original_uuid))); |
| FsManager fs(env_, kTestDir); |
| ASSERT_OK(fs.Open()); |
| |
| string canonicalized_uuid; |
| ASSERT_OK(generator.Canonicalize(fs.uuid(), &canonicalized_uuid)); |
| ASSERT_EQ(fs.uuid(), canonicalized_uuid); |
| ASSERT_EQ(fs.uuid(), original_uuid); |
| } |
| |
| TEST_F(ToolTest, TestFsDumpUuid) { |
| const string kTestDir = GetTestPath("test"); |
| string uuid; |
| { |
| FsManager fs(env_, kTestDir); |
| ASSERT_OK(fs.CreateInitialFileSystemLayout()); |
| ASSERT_OK(fs.Open()); |
| uuid = fs.uuid(); |
| } |
| string stdout; |
| NO_FATALS(RunActionStdoutString(Substitute( |
| "fs dump uuid --fs_wal_dir=$0", kTestDir), &stdout)); |
| SCOPED_TRACE(stdout); |
| ASSERT_EQ(uuid, stdout); |
| } |
| |
| TEST_F(ToolTest, TestPbcTools) { |
| const string kTestDir = GetTestPath("test"); |
| string uuid; |
| string instance_path; |
| { |
| ObjectIdGenerator generator; |
| FsManager fs(env_, kTestDir); |
| ASSERT_OK(fs.CreateInitialFileSystemLayout(generator.Next())); |
| ASSERT_OK(fs.Open()); |
| uuid = fs.uuid(); |
| instance_path = fs.GetInstanceMetadataPath(kTestDir); |
| } |
| { |
| vector<string> stdout; |
| NO_FATALS(RunActionStdoutLines(Substitute( |
| "pbc dump $0", instance_path), &stdout)); |
| SCOPED_TRACE(stdout); |
| ASSERT_EQ(4, stdout.size()); |
| ASSERT_EQ("Message 0", stdout[0]); |
| ASSERT_EQ("-------", stdout[1]); |
| ASSERT_EQ(Substitute("uuid: \"$0\"", uuid), stdout[2]); |
| ASSERT_STR_MATCHES(stdout[3], "^format_stamp: \"Formatted at .*\"$"); |
| } |
| // Test dump --debug |
| { |
| vector<string> stdout; |
| NO_FATALS(RunActionStdoutLines(Substitute( |
| "pbc dump $0 --debug", instance_path), &stdout)); |
| SCOPED_TRACE(stdout); |
| ASSERT_EQ(12, stdout.size()); |
| ASSERT_EQ("File header", stdout[0]); |
| ASSERT_EQ("-------", stdout[1]); |
| ASSERT_STR_MATCHES(stdout[2], "^Protobuf container version:"); |
| ASSERT_STR_MATCHES(stdout[3], "^Total container file size:"); |
| ASSERT_STR_MATCHES(stdout[4], "^Entry PB type:"); |
| ASSERT_EQ("Message 0", stdout[6]); |
| ASSERT_STR_MATCHES(stdout[7], "^offset:"); |
| ASSERT_STR_MATCHES(stdout[8], "^length:"); |
| ASSERT_EQ("-------", stdout[9]); |
| ASSERT_EQ(Substitute("uuid: \"$0\"", uuid), stdout[10]); |
| ASSERT_STR_MATCHES(stdout[11], "^format_stamp: \"Formatted at .*\"$"); |
| } |
| // Test dump --oneline |
| { |
| string stdout; |
| NO_FATALS(RunActionStdoutString(Substitute( |
| "pbc dump $0/instance --oneline", kTestDir), &stdout)); |
| SCOPED_TRACE(stdout); |
| ASSERT_STR_MATCHES(stdout, Substitute( |
| "^0\tuuid: \"$0\" format_stamp: \"Formatted at .*\"$$", uuid)); |
| } |
| // Test dump --json |
| { |
| // Since the UUID is listed as 'bytes' rather than 'string' in the PB, it dumps |
| // base64-encoded. |
| string uuid_b64; |
| Base64Encode(uuid, &uuid_b64); |
| |
| string stdout; |
| NO_FATALS(RunActionStdoutString(Substitute( |
| "pbc dump $0/instance --json", kTestDir), &stdout)); |
| SCOPED_TRACE(stdout); |
| ASSERT_STR_MATCHES(stdout, Substitute( |
| "^\\{\"uuid\":\"$0\",\"formatStamp\":\"Formatted at .*\"\\}$$", uuid_b64)); |
| } |
| |
| // Utility to set the editor up based on the given shell command. |
| auto DoEdit = [&](const string& editor_shell, string* stdout, string* stderr = nullptr, |
| const string& extra_flags = "") { |
| const string editor_path = GetTestPath("editor"); |
| CHECK_OK(WriteStringToFile(Env::Default(), |
| StrCat("#!/usr/bin/env bash\n", editor_shell), |
| editor_path)); |
| chmod(editor_path.c_str(), 0755); |
| setenv("EDITOR", editor_path.c_str(), /* overwrite */1); |
| return RunTool(Substitute("pbc edit $0 $1/instance", extra_flags, kTestDir), |
| stdout, stderr, nullptr, nullptr); |
| }; |
| |
| // Test 'edit' by setting up EDITOR to be a sed script which performs a substitution. |
| { |
| string stdout; |
| ASSERT_OK(DoEdit("exec sed -i -e s/Formatted/Edited/ \"$@\"\n", &stdout, nullptr, |
| "--nobackup")); |
| ASSERT_EQ("", stdout); |
| |
| // Dump to make sure the edit took place. |
| NO_FATALS(RunActionStdoutString(Substitute( |
| "pbc dump $0/instance --oneline", kTestDir), &stdout)); |
| ASSERT_STR_MATCHES(stdout, Substitute( |
| "^0\tuuid: \"$0\" format_stamp: \"Edited at .*\"$$", uuid)); |
| |
| // Make sure no backup file was written. |
| bool found_backup; |
| ASSERT_OK(HasAtLeastOneBackupFile(kTestDir, &found_backup)); |
| ASSERT_FALSE(found_backup); |
| } |
| |
| // Test 'edit' with a backup. |
| { |
| string stdout; |
| ASSERT_OK(DoEdit("exec sed -i -e s/Formatted/Edited/ \"$@\"\n", &stdout)); |
| ASSERT_EQ("", stdout); |
| |
| // Make sure a backup file was written. |
| bool found_backup; |
| ASSERT_OK(HasAtLeastOneBackupFile(kTestDir, &found_backup)); |
| ASSERT_TRUE(found_backup); |
| } |
| |
| // Test 'edit' with an unsuccessful edit. |
| { |
| string stdout, stderr; |
| string path; |
| ASSERT_OK(FindExecutable("false", {}, &path)); |
| Status s = DoEdit(path, &stdout, &stderr); |
| ASSERT_FALSE(s.ok()); |
| ASSERT_EQ("", stdout); |
| ASSERT_STR_CONTAINS(stderr, "Aborted: editor returned non-zero exit code"); |
| } |
| |
| // Test 'edit' with an edit which tries to write some invalid JSON (missing required fields). |
| { |
| string stdout, stderr; |
| Status s = DoEdit("echo {} > $@\n", &stdout, &stderr); |
| ASSERT_EQ("", stdout); |
| ASSERT_STR_MATCHES(stderr, |
| "Invalid argument: Unable to parse JSON line: \\{\\}: " |
| ": missing field .*"); |
| // NOTE: the above extra ':' is due to an apparent bug in protobuf. |
| } |
| |
| // Test 'edit' with an edit that writes some invalid JSON (bad syntax) |
| { |
| string stdout, stderr; |
| Status s = DoEdit("echo not-a-json-string > $@\n", &stdout, &stderr); |
| ASSERT_EQ("", stdout); |
| ASSERT_STR_CONTAINS( |
| stderr, |
| "Invalid argument: Unable to parse JSON line: not-a-json-string: Unexpected token.\n" |
| "not-a-json-string\n" |
| "^"); |
| } |
| |
| // The file should be unchanged by the unsuccessful edits above. |
| { |
| string stdout; |
| NO_FATALS(RunActionStdoutString(Substitute( |
| "pbc dump $0/instance --oneline", kTestDir), &stdout)); |
| ASSERT_STR_MATCHES(stdout, Substitute( |
| "^0\tuuid: \"$0\" format_stamp: \"Edited at .*\"$$", uuid)); |
| } |
| } |
| |
| TEST_F(ToolTest, TestFsDumpCFile) { |
| const int kNumEntries = 8192; |
| const string kTestDir = GetTestPath("test"); |
| FsManager fs(env_, kTestDir); |
| ASSERT_OK(fs.CreateInitialFileSystemLayout()); |
| ASSERT_OK(fs.Open()); |
| |
| unique_ptr<WritableBlock> block; |
| ASSERT_OK(fs.CreateNewBlock({}, &block)); |
| BlockId block_id = block->id(); |
| StringDataGenerator<false> generator("hello %04d"); |
| WriterOptions opts; |
| opts.write_posidx = true; |
| CFileWriter writer(opts, GetTypeInfo(generator.kDataType), |
| generator.has_nulls(), std::move(block)); |
| ASSERT_OK(writer.Start()); |
| generator.Build(kNumEntries); |
| ASSERT_OK_FAST(writer.AppendEntries(generator.values(), kNumEntries)); |
| ASSERT_OK(writer.Finish()); |
| |
| { |
| NO_FATALS(RunActionStdoutNone(Substitute( |
| "fs dump cfile --fs_wal_dir=$0 $1 --noprint_meta --noprint_rows", |
| kTestDir, block_id.ToString()))); |
| } |
| vector<string> stdout; |
| { |
| NO_FATALS(RunActionStdoutLines(Substitute( |
| "fs dump cfile --fs_wal_dir=$0 $1 --noprint_rows", |
| kTestDir, block_id.ToString()), &stdout)); |
| SCOPED_TRACE(stdout); |
| ASSERT_GE(stdout.size(), 4); |
| ASSERT_EQ(stdout[0], "Header:"); |
| ASSERT_EQ(stdout[2], "Footer:"); |
| } |
| { |
| NO_FATALS(RunActionStdoutLines(Substitute( |
| "fs dump cfile --fs_wal_dir=$0 $1 --noprint_meta", |
| kTestDir, block_id.ToString()), &stdout)); |
| SCOPED_TRACE(stdout); |
| ASSERT_EQ(kNumEntries, stdout.size()); |
| } |
| { |
| NO_FATALS(RunActionStdoutLines(Substitute( |
| "fs dump cfile --fs_wal_dir=$0 $1", |
| kTestDir, block_id.ToString()), &stdout)); |
| SCOPED_TRACE(stdout); |
| ASSERT_GT(stdout.size(), kNumEntries); |
| ASSERT_EQ(stdout[0], "Header:"); |
| ASSERT_EQ(stdout[2], "Footer:"); |
| } |
| } |
| |
| TEST_F(ToolTest, TestFsDumpBlock) { |
| const string kTestDir = GetTestPath("test"); |
| FsManager fs(env_, kTestDir); |
| ASSERT_OK(fs.CreateInitialFileSystemLayout()); |
| ASSERT_OK(fs.Open()); |
| |
| unique_ptr<WritableBlock> block; |
| ASSERT_OK(fs.CreateNewBlock({}, &block)); |
| ASSERT_OK(block->Append("hello world")); |
| ASSERT_OK(block->Close()); |
| BlockId block_id = block->id(); |
| |
| { |
| string stdout; |
| NO_FATALS(RunActionStdoutString(Substitute( |
| "fs dump block --fs_wal_dir=$0 $1", |
| kTestDir, block_id.ToString()), &stdout)); |
| ASSERT_EQ("hello world", stdout); |
| } |
| } |
| |
| TEST_F(ToolTest, TestWalDump) { |
| const string kTestDir = GetTestPath("test"); |
| const string kTestTablet = "ffffffffffffffffffffffffffffffff"; |
| const Schema kSchema(GetSimpleTestSchema()); |
| const Schema kSchemaWithIds(SchemaBuilder(kSchema).Build()); |
| |
| FsManager fs(env_, kTestDir); |
| ASSERT_OK(fs.CreateInitialFileSystemLayout()); |
| ASSERT_OK(fs.Open()); |
| |
| { |
| scoped_refptr<Log> log; |
| ASSERT_OK(Log::Open(LogOptions(), |
| &fs, |
| kTestTablet, |
| kSchemaWithIds, |
| 0, // schema_version |
| scoped_refptr<MetricEntity>(), |
| &log)); |
| |
| OpId opid = consensus::MakeOpId(1, 1); |
| ReplicateRefPtr replicate = |
| consensus::make_scoped_refptr_replicate(new ReplicateMsg()); |
| replicate->get()->set_op_type(consensus::WRITE_OP); |
| replicate->get()->mutable_id()->CopyFrom(opid); |
| replicate->get()->set_timestamp(1); |
| WriteRequestPB* write = replicate->get()->mutable_write_request(); |
| ASSERT_OK(SchemaToPB(kSchema, write->mutable_schema())); |
| AddTestRowToPB(RowOperationsPB::INSERT, kSchema, |
| opid.index(), |
| 0, |
| "this is a test insert", |
| write->mutable_row_operations()); |
| write->set_tablet_id(kTestTablet); |
| Synchronizer s; |
| ASSERT_OK(log->AsyncAppendReplicates({ replicate }, s.AsStatusCallback())); |
| ASSERT_OK(s.Wait()); |
| } |
| |
| string wal_path = fs.GetWalSegmentFileName(kTestTablet, 1); |
| string stdout; |
| for (const auto& args : { Substitute("wal dump $0", wal_path), |
| Substitute("local_replica dump wals --fs_wal_dir=$0 $1", |
| kTestDir, kTestTablet) |
| }) { |
| SCOPED_TRACE(args); |
| for (const auto& print_entries : { "true", "1", "yes", "decoded" }) { |
| SCOPED_TRACE(print_entries); |
| NO_FATALS(RunActionStdoutString(Substitute("$0 --print_entries=$1", |
| args, print_entries), &stdout)); |
| SCOPED_TRACE(stdout); |
| ASSERT_STR_MATCHES(stdout, "Header:"); |
| ASSERT_STR_MATCHES(stdout, "1\\.1@1"); |
| ASSERT_STR_MATCHES(stdout, "this is a test insert"); |
| ASSERT_STR_NOT_MATCHES(stdout, "t<truncated>"); |
| ASSERT_STR_NOT_MATCHES(stdout, "row_operations \\{"); |
| ASSERT_STR_MATCHES(stdout, "Footer:"); |
| } |
| for (const auto& print_entries : { "false", "0", "no" }) { |
| SCOPED_TRACE(print_entries); |
| NO_FATALS(RunActionStdoutString(Substitute("$0 --print_entries=$1", |
| args, print_entries), &stdout)); |
| SCOPED_TRACE(stdout); |
| ASSERT_STR_MATCHES(stdout, "Header:"); |
| ASSERT_STR_NOT_MATCHES(stdout, "1\\.1@1"); |
| ASSERT_STR_NOT_MATCHES(stdout, "this is a test insert"); |
| ASSERT_STR_NOT_MATCHES(stdout, "t<truncated>"); |
| ASSERT_STR_NOT_MATCHES(stdout, "row_operations \\{"); |
| ASSERT_STR_MATCHES(stdout, "Footer:"); |
| } |
| { |
| NO_FATALS(RunActionStdoutString(Substitute("$0 --print_entries=pb", |
| args), &stdout)); |
| SCOPED_TRACE(stdout); |
| ASSERT_STR_MATCHES(stdout, "Header:"); |
| ASSERT_STR_NOT_MATCHES(stdout, "1\\.1@1"); |
| ASSERT_STR_MATCHES(stdout, "this is a test insert"); |
| ASSERT_STR_NOT_MATCHES(stdout, "t<truncated>"); |
| ASSERT_STR_MATCHES(stdout, "row_operations \\{"); |
| ASSERT_STR_MATCHES(stdout, "Footer:"); |
| } |
| { |
| NO_FATALS(RunActionStdoutString(Substitute( |
| "$0 --print_entries=pb --truncate_data=1", args), &stdout)); |
| SCOPED_TRACE(stdout); |
| ASSERT_STR_MATCHES(stdout, "Header:"); |
| ASSERT_STR_NOT_MATCHES(stdout, "1\\.1@1"); |
| ASSERT_STR_NOT_MATCHES(stdout, "this is a test insert"); |
| ASSERT_STR_MATCHES(stdout, "t<truncated>"); |
| ASSERT_STR_MATCHES(stdout, "row_operations \\{"); |
| ASSERT_STR_MATCHES(stdout, "Footer:"); |
| } |
| { |
| NO_FATALS(RunActionStdoutString(Substitute( |
| "$0 --print_entries=id", args), &stdout)); |
| SCOPED_TRACE(stdout); |
| ASSERT_STR_MATCHES(stdout, "Header:"); |
| ASSERT_STR_MATCHES(stdout, "1\\.1@1"); |
| ASSERT_STR_NOT_MATCHES(stdout, "this is a test insert"); |
| ASSERT_STR_NOT_MATCHES(stdout, "t<truncated>"); |
| ASSERT_STR_NOT_MATCHES(stdout, "row_operations \\{"); |
| ASSERT_STR_MATCHES(stdout, "Footer:"); |
| } |
| { |
| NO_FATALS(RunActionStdoutString(Substitute( |
| "$0 --print_meta=false", args), &stdout)); |
| SCOPED_TRACE(stdout); |
| ASSERT_STR_NOT_MATCHES(stdout, "Header:"); |
| ASSERT_STR_MATCHES(stdout, "1\\.1@1"); |
| ASSERT_STR_MATCHES(stdout, "this is a test insert"); |
| ASSERT_STR_NOT_MATCHES(stdout, "row_operations \\{"); |
| ASSERT_STR_NOT_MATCHES(stdout, "Footer:"); |
| } |
| } |
| } |
| |
| TEST_F(ToolTest, TestLocalReplicaDumpMeta) { |
| const string kTestDir = GetTestPath("test"); |
| const string kTestTablet = "ffffffffffffffffffffffffffffffff"; |
| const string kTestTableId = "test-table"; |
| const string kTestTableName = "test-fs-meta-dump-table"; |
| const Schema kSchema(GetSimpleTestSchema()); |
| const Schema kSchemaWithIds(SchemaBuilder(kSchema).Build()); |
| |
| FsManager fs(env_, kTestDir); |
| ASSERT_OK(fs.CreateInitialFileSystemLayout()); |
| ASSERT_OK(fs.Open()); |
| |
| pair<PartitionSchema, Partition> partition = tablet::CreateDefaultPartition( |
| kSchemaWithIds); |
| scoped_refptr<TabletMetadata> meta; |
| TabletMetadata::CreateNew(&fs, kTestTablet, kTestTableName, kTestTableId, |
| kSchemaWithIds, partition.first, partition.second, |
| tablet::TABLET_DATA_READY, |
| /*tombstone_last_logged_opid=*/ boost::none, |
| &meta); |
| string stdout; |
| NO_FATALS(RunActionStdoutString(Substitute("local_replica dump meta $0 " |
| "--fs_wal_dir=$1 " |
| "--fs_data_dirs=$2", |
| kTestTablet, kTestDir, |
| kTestDir), &stdout)); |
| |
| // Verify the contents of the metadata output |
| SCOPED_TRACE(stdout); |
| string debug_str = meta->partition_schema() |
| .PartitionDebugString(meta->partition(), meta->schema()); |
| StripWhiteSpace(&debug_str); |
| ASSERT_STR_CONTAINS(stdout, debug_str); |
| debug_str = Substitute("Table name: $0 Table id: $1", |
| meta->table_name(), meta->table_id()); |
| ASSERT_STR_CONTAINS(stdout, debug_str); |
| debug_str = Substitute("Schema (version=$0):", meta->schema_version()); |
| ASSERT_STR_CONTAINS(stdout, debug_str); |
| debug_str = meta->schema().ToString(); |
| StripWhiteSpace(&debug_str); |
| ASSERT_STR_CONTAINS(stdout, debug_str); |
| |
| TabletSuperBlockPB pb1; |
| meta->ToSuperBlock(&pb1); |
| debug_str = pb_util::SecureDebugString(pb1); |
| StripWhiteSpace(&debug_str); |
| ASSERT_STR_CONTAINS(stdout, "Superblock:"); |
| ASSERT_STR_CONTAINS(stdout, debug_str); |
| } |
| |
| TEST_F(ToolTest, TestFsDumpTree) { |
| const string kTestDir = GetTestPath("test"); |
| const Schema kSchema(GetSimpleTestSchema()); |
| const Schema kSchemaWithIds(SchemaBuilder(kSchema).Build()); |
| |
| FsManager fs(env_, kTestDir); |
| ASSERT_OK(fs.CreateInitialFileSystemLayout()); |
| ASSERT_OK(fs.Open()); |
| |
| string stdout; |
| NO_FATALS(RunActionStdoutString(Substitute("fs dump tree --fs_wal_dir=$0 " |
| "--fs_data_dirs=$1", |
| kTestDir, kTestDir), &stdout)); |
| |
| // It suffices to verify the contents of the top-level tree structure. |
| SCOPED_TRACE(stdout); |
| ostringstream tree_out; |
| fs.DumpFileSystemTree(tree_out); |
| string tree_out_str = tree_out.str(); |
| StripWhiteSpace(&tree_out_str); |
| ASSERT_EQ(stdout, tree_out_str); |
| } |
| |
| TEST_F(ToolTest, TestLocalReplicaOps) { |
| const string kTestDir = GetTestPath("test"); |
| |
| ObjectIdGenerator generator; |
| const string kTestTablet = "ffffffffffffffffffffffffffffffff"; |
| const int kRowId = 100; |
| const Schema kSchema(GetSimpleTestSchema()); |
| const Schema kSchemaWithIds(SchemaBuilder(kSchema).Build()); |
| |
| TabletHarness::Options opts(kTestDir); |
| opts.tablet_id = kTestTablet; |
| TabletHarness harness(kSchemaWithIds, opts); |
| ASSERT_OK(harness.Create(true)); |
| ASSERT_OK(harness.Open()); |
| LocalTabletWriter writer(harness.tablet().get(), &kSchema); |
| KuduPartialRow row(&kSchemaWithIds); |
| for (int i = 0; i< 10; i++) { |
| ASSERT_OK(row.SetInt32(0, i)); |
| ASSERT_OK(row.SetInt32(1, i*10)); |
| ASSERT_OK(row.SetStringCopy(2, "HelloWorld")); |
| writer.Insert(row); |
| } |
| harness.tablet()->Flush(); |
| harness.tablet()->Shutdown(); |
| string fs_paths = "--fs_wal_dir=" + kTestDir + " " |
| "--fs_data_dirs=" + kTestDir; |
| { |
| string stdout; |
| NO_FATALS(RunActionStdoutString( |
| Substitute("local_replica dump block_ids $0 $1", |
| kTestTablet, fs_paths), &stdout)); |
| |
| SCOPED_TRACE(stdout); |
| string tablet_out = "Listing all data blocks in tablet " + kTestTablet; |
| ASSERT_STR_CONTAINS(stdout, tablet_out); |
| ASSERT_STR_CONTAINS(stdout, "Rowset "); |
| ASSERT_STR_MATCHES(stdout, "Column block for column ID .*"); |
| ASSERT_STR_CONTAINS(stdout, "key[int32 NOT NULL]"); |
| ASSERT_STR_CONTAINS(stdout, "int_val[int32 NOT NULL]"); |
| ASSERT_STR_CONTAINS(stdout, "string_val[string NULLABLE]"); |
| } |
| { |
| string stdout; |
| NO_FATALS(RunActionStdoutString( |
| Substitute("local_replica dump rowset $0 $1", |
| kTestTablet, fs_paths), &stdout)); |
| |
| SCOPED_TRACE(stdout); |
| ASSERT_STR_CONTAINS(stdout, "Dumping rowset 0"); |
| ASSERT_STR_MATCHES(stdout, "RowSet metadata: .*"); |
| ASSERT_STR_MATCHES(stdout, "last_durable_dms_id: .*"); |
| ASSERT_STR_CONTAINS(stdout, "columns {"); |
| ASSERT_STR_CONTAINS(stdout, "block {"); |
| ASSERT_STR_CONTAINS(stdout, "}"); |
| ASSERT_STR_MATCHES(stdout, "column_id:.*"); |
| ASSERT_STR_CONTAINS(stdout, "bloom_block {"); |
| ASSERT_STR_MATCHES(stdout, "id: .*"); |
| ASSERT_STR_CONTAINS(stdout, "undo_deltas {"); |
| |
| ASSERT_STR_CONTAINS(stdout, |
| "RowIdxInBlock: 0; Base: (int32 key=0, int32 int_val=0," |
| " string string_val=\"HelloWorld\"); " |
| "Undo Mutations: [@1(DELETE)]; Redo Mutations: [];"); |
| ASSERT_STR_MATCHES(stdout, ".*---------------------.*"); |
| |
| // This is expected to fail with Invalid argument for kRowId. |
| string stderr; |
| Status s = RunTool( |
| Substitute("local_replica dump rowset $0 $1 --rowset_index=$2", |
| kTestTablet, fs_paths, kRowId), |
| &stdout, &stderr, nullptr, nullptr); |
| ASSERT_TRUE(s.IsRuntimeError()); |
| SCOPED_TRACE(stderr); |
| string expected = "Could not find rowset " + SimpleItoa(kRowId) + |
| " in tablet id " + kTestTablet; |
| ASSERT_STR_CONTAINS(stderr, expected); |
| } |
| { |
| TabletMetadata* meta = harness.tablet()->metadata(); |
| string stdout; |
| string debug_str; |
| NO_FATALS(RunActionStdoutString( |
| Substitute("local_replica dump meta $0 $1", |
| kTestTablet, fs_paths), &stdout)); |
| |
| SCOPED_TRACE(stdout); |
| debug_str = meta->partition_schema() |
| .PartitionDebugString(meta->partition(), meta->schema()); |
| StripWhiteSpace(&debug_str); |
| ASSERT_STR_CONTAINS(stdout, debug_str); |
| debug_str = Substitute("Table name: $0 Table id: $1", |
| meta->table_name(), meta->table_id()); |
| ASSERT_STR_CONTAINS(stdout, debug_str); |
| debug_str = Substitute("Schema (version=$0):", meta->schema_version()); |
| StripWhiteSpace(&debug_str); |
| ASSERT_STR_CONTAINS(stdout, debug_str); |
| debug_str = meta->schema().ToString(); |
| StripWhiteSpace(&debug_str); |
| ASSERT_STR_CONTAINS(stdout, debug_str); |
| |
| TabletSuperBlockPB pb1; |
| meta->ToSuperBlock(&pb1); |
| debug_str = pb_util::SecureDebugString(pb1); |
| StripWhiteSpace(&debug_str); |
| ASSERT_STR_CONTAINS(stdout, "Superblock:"); |
| ASSERT_STR_CONTAINS(stdout, debug_str); |
| } |
| { |
| string stdout; |
| NO_FATALS(RunActionStdoutString( |
| Substitute("local_replica data_size $0 $1", |
| kTestTablet, fs_paths), &stdout)); |
| SCOPED_TRACE(stdout); |
| |
| string expected = R"( |
| table id | tablet id | rowset id | block type | size |
| -----------------+----------------------------------+-----------+------------------+------ |
| KuduTableTestId | ffffffffffffffffffffffffffffffff | 0 | c10 (key) | 164B |
| KuduTableTestId | ffffffffffffffffffffffffffffffff | 0 | c11 (int_val) | 113B |
| KuduTableTestId | ffffffffffffffffffffffffffffffff | 0 | c12 (string_val) | 138B |
| KuduTableTestId | ffffffffffffffffffffffffffffffff | 0 | REDO | 0B |
| KuduTableTestId | ffffffffffffffffffffffffffffffff | 0 | UNDO | 169B |
| KuduTableTestId | ffffffffffffffffffffffffffffffff | 0 | BLOOM | 4.1K |
| KuduTableTestId | ffffffffffffffffffffffffffffffff | 0 | PK | 0B |
| KuduTableTestId | ffffffffffffffffffffffffffffffff | 0 | * | 4.6K |
| KuduTableTestId | ffffffffffffffffffffffffffffffff | * | c10 (key) | 164B |
| KuduTableTestId | ffffffffffffffffffffffffffffffff | * | c11 (int_val) | 113B |
| KuduTableTestId | ffffffffffffffffffffffffffffffff | * | c12 (string_val) | 138B |
| KuduTableTestId | ffffffffffffffffffffffffffffffff | * | REDO | 0B |
| KuduTableTestId | ffffffffffffffffffffffffffffffff | * | UNDO | 169B |
| KuduTableTestId | ffffffffffffffffffffffffffffffff | * | BLOOM | 4.1K |
| KuduTableTestId | ffffffffffffffffffffffffffffffff | * | PK | 0B |
| KuduTableTestId | ffffffffffffffffffffffffffffffff | * | * | 4.6K |
| KuduTableTestId | * | * | c10 (key) | 164B |
| KuduTableTestId | * | * | c11 (int_val) | 113B |
| KuduTableTestId | * | * | c12 (string_val) | 138B |
| KuduTableTestId | * | * | REDO | 0B |
| KuduTableTestId | * | * | UNDO | 169B |
| KuduTableTestId | * | * | BLOOM | 4.1K |
| KuduTableTestId | * | * | PK | 0B |
| KuduTableTestId | * | * | * | 4.6K |
| )"; |
| // Preprocess stdout and our expected table so that we are less |
| // sensitive to small variations in encodings, id assignment, etc. |
| for (string* p : {&stdout, &expected}) { |
| // Replace any string of digits with a single '#'. |
| StripString(p, "0123456789.", '#'); |
| StripDupCharacters(p, '#', 0); |
| // Collapse whitespace to a single space. |
| StripDupCharacters(p, ' ', 0); |
| // Strip the leading and trailing whitespace. |
| StripWhiteSpace(p); |
| // Collapse '-'s to a single '-' so that different width columns |
| // don't change the width of the header line. |
| StripDupCharacters(p, '-', 0); |
| } |
| |
| EXPECT_EQ(stdout, expected); |
| } |
| { |
| string stdout; |
| NO_FATALS(RunActionStdoutString(Substitute("local_replica list $0", |
| fs_paths), &stdout)); |
| |
| SCOPED_TRACE(stdout); |
| ASSERT_STR_MATCHES(stdout, kTestTablet); |
| } |
| |
| // Test 'kudu fs list' tablet group. |
| { |
| string stdout; |
| NO_FATALS(RunActionStdoutString( |
| Substitute("fs list $0 --columns=table,tablet-id --format=csv", |
| fs_paths), |
| &stdout)); |
| |
| SCOPED_TRACE(stdout); |
| EXPECT_EQ(stdout, "KuduTableTest,ffffffffffffffffffffffffffffffff"); |
| } |
| |
| // Test 'kudu fs list' rowset group. |
| { |
| string stdout; |
| NO_FATALS(RunActionStdoutString( |
| Substitute("fs list $0 --columns=table,tablet-id,rowset-id --format=csv", |
| fs_paths), |
| &stdout)); |
| |
| SCOPED_TRACE(stdout); |
| EXPECT_EQ(stdout, "KuduTableTest,ffffffffffffffffffffffffffffffff,0"); |
| } |
| // Test 'kudu fs list' block group. |
| { |
| vector<string> stdout; |
| NO_FATALS(RunActionStdoutLines( |
| Substitute("fs list $0 " |
| "--columns=table,tablet-id,rowset-id,block-kind,column " |
| "--format=csv", |
| fs_paths), |
| &stdout)); |
| |
| SCOPED_TRACE(stdout); |
| ASSERT_EQ(5, stdout.size()); |
| EXPECT_EQ(stdout[0], Substitute("KuduTableTest,$0,0,column,key", kTestTablet)); |
| EXPECT_EQ(stdout[1], Substitute("KuduTableTest,$0,0,column,int_val", kTestTablet)); |
| EXPECT_EQ(stdout[2], Substitute("KuduTableTest,$0,0,column,string_val", kTestTablet)); |
| EXPECT_EQ(stdout[3], Substitute("KuduTableTest,$0,0,undo,", kTestTablet)); |
| EXPECT_EQ(stdout[4], Substitute("KuduTableTest,$0,0,bloom,", kTestTablet)); |
| } |
| |
| // Test 'kudu fs list' cfile group. |
| { |
| vector<string> stdout; |
| NO_FATALS(RunActionStdoutLines( |
| Substitute("fs list $0 " |
| "--columns=table,tablet-id,rowset-id,block-kind," |
| "column,cfile-encoding,cfile-num-values " |
| "--format=csv", |
| fs_paths), |
| &stdout)); |
| |
| SCOPED_TRACE(stdout); |
| ASSERT_EQ(5, stdout.size()); |
| EXPECT_EQ(stdout[0], |
| Substitute("KuduTableTest,$0,0,column,key,BIT_SHUFFLE,10", kTestTablet)); |
| EXPECT_EQ(stdout[1], |
| Substitute("KuduTableTest,$0,0,column,int_val,BIT_SHUFFLE,10", kTestTablet)); |
| EXPECT_EQ(stdout[2], |
| Substitute("KuduTableTest,$0,0,column,string_val,DICT_ENCODING,10", kTestTablet)); |
| EXPECT_EQ(stdout[3], |
| Substitute("KuduTableTest,$0,0,undo,,PLAIN_ENCODING,10", kTestTablet)); |
| EXPECT_EQ(stdout[4], |
| Substitute("KuduTableTest,$0,0,bloom,,PLAIN_ENCODING,0", kTestTablet)); |
| } |
| } |
| |
| // Create and start Kudu mini cluster, optionally creating a table in the DB, |
| // and then run 'kudu perf loadgen ...' utility against it. |
| void ToolTest::RunLoadgen(int num_tservers, |
| const vector<string>& tool_args, |
| const string& table_name) { |
| ExternalMiniClusterOptions opts; |
| opts.num_tablet_servers = num_tservers; |
| NO_FATALS(StartExternalMiniCluster(std::move(opts))); |
| if (!table_name.empty()) { |
| static const string kKeyColumnName = "key"; |
| static const Schema kSchema = Schema( |
| { |
| ColumnSchema(kKeyColumnName, INT64), |
| ColumnSchema("bool_val", BOOL), |
| ColumnSchema("int8_val", INT8), |
| ColumnSchema("int16_val", INT16), |
| ColumnSchema("int32_val", INT32), |
| ColumnSchema("int64_val", INT64), |
| ColumnSchema("float_val", FLOAT), |
| ColumnSchema("double_val", DOUBLE), |
| ColumnSchema("decimal32_val", DECIMAL32, false, |
| NULL, NULL, ColumnStorageAttributes(), |
| ColumnTypeAttributes(9, 9)), |
| ColumnSchema("decimal64_val", DECIMAL64, false, |
| NULL, NULL, ColumnStorageAttributes(), |
| ColumnTypeAttributes(18, 2)), |
| ColumnSchema("decimal128_val", DECIMAL128, false, |
| NULL, NULL, ColumnStorageAttributes(), |
| ColumnTypeAttributes(38, 0)), |
| ColumnSchema("unixtime_micros_val", UNIXTIME_MICROS), |
| ColumnSchema("string_val", STRING), |
| ColumnSchema("binary_val", BINARY), |
| }, 1); |
| |
| shared_ptr<KuduClient> client; |
| ASSERT_OK(cluster_->CreateClient(nullptr, &client)); |
| KuduSchema client_schema(client::KuduSchemaFromSchema(kSchema)); |
| unique_ptr<client::KuduTableCreator> table_creator( |
| client->NewTableCreator()); |
| ASSERT_OK(table_creator->table_name(table_name) |
| .schema(&client_schema) |
| .add_hash_partitions({kKeyColumnName}, 2) |
| .num_replicas(cluster_->num_tablet_servers()) |
| .Create()); |
| } |
| vector<string> args = { |
| "perf", |
| "loadgen", |
| cluster_->master()->bound_rpc_addr().ToString(), |
| }; |
| if (!table_name.empty()) { |
| args.push_back(Substitute("-table_name=$0", table_name)); |
| } |
| copy(tool_args.begin(), tool_args.end(), back_inserter(args)); |
| ASSERT_OK(RunKuduTool(args)); |
| } |
| |
| // Run the loadgen benchmark with all optional parameters set to defaults. |
| TEST_F(ToolTest, TestLoadgenDefaultParameters) { |
| NO_FATALS(RunLoadgen()); |
| } |
| |
| // Run the loadgen benchmark in AUTO_FLUSH_BACKGROUND mode, sequential values. |
| TEST_F(ToolTest, TestLoadgenAutoFlushBackgroundSequential) { |
| NO_FATALS(RunLoadgen(3, |
| { |
| "--buffer_flush_watermark_pct=0.125", |
| "--buffer_size_bytes=65536", |
| "--buffers_num=8", |
| "--num_rows_per_thread=2048", |
| "--num_threads=4", |
| "--run_scan", |
| "--string_fixed=0123456789", |
| }, |
| "bench_auto_flush_background_sequential")); |
| } |
| |
| // Run loadgen benchmark in AUTO_FLUSH_BACKGROUND mode, randomized values. |
| TEST_F(ToolTest, TestLoadgenAutoFlushBackgroundRandom) { |
| NO_FATALS(RunLoadgen(5, |
| { |
| "--buffer_flush_watermark_pct=0.125", |
| "--buffer_size_bytes=65536", |
| "--buffers_num=8", |
| // small number of rows to avoid collisions: it's random generation mode |
| "--num_rows_per_thread=16", |
| "--num_threads=1", |
| "--run_scan", |
| "--string_len=8", |
| "--use_random", |
| }, |
| "bench_auto_flush_background_random")); |
| } |
| |
| // Run the loadgen benchmark in MANUAL_FLUSH mode. |
| TEST_F(ToolTest, TestLoadgenManualFlush) { |
| NO_FATALS(RunLoadgen(3, |
| { |
| "--buffer_size_bytes=524288", |
| "--buffers_num=2", |
| "--flush_per_n_rows=1024", |
| "--num_rows_per_thread=4096", |
| "--num_threads=3", |
| "--run_scan", |
| "--show_first_n_errors=3", |
| "--string_len=16", |
| }, |
| "bench_manual_flush")); |
| } |
| |
| TEST_F(ToolTest, TestLoadgenServerSideDefaultNumReplicas) { |
| NO_FATALS(RunLoadgen(3, { "--table_num_replicas=0" })); |
| } |
| |
| TEST_F(ToolTest, TestLoadgenDatabaseName) { |
| NO_FATALS(RunLoadgen(1, { "--auto_database=foo", "--keep_auto_table=true" })); |
| string out; |
| NO_FATALS(RunActionStdoutString(Substitute("table list $0", |
| HostPort::ToCommaSeparatedString(cluster_->master_rpc_addrs())), &out)); |
| ASSERT_STR_CONTAINS(out, "foo.loadgen_auto_"); |
| } |
| |
| TEST_F(ToolTest, TestLoadgenHmsEnabled) { |
| ExternalMiniClusterOptions opts; |
| opts.hms_mode = HmsMode::ENABLE_HIVE_METASTORE; |
| NO_FATALS(StartExternalMiniCluster(std::move(opts))); |
| string out; |
| NO_FATALS(RunActionStdoutString(Substitute("perf loadgen $0", |
| HostPort::ToCommaSeparatedString(cluster_->master_rpc_addrs())), &out)); |
| } |
| |
| // Run the loadgen, generating a few different partitioning schemas. |
| TEST_F(ToolTest, TestLoadgenAutoGenTablePartitioning) { |
| { |
| ExternalMiniClusterOptions opts; |
| opts.num_tablet_servers = 1; |
| NO_FATALS(StartExternalMiniCluster(std::move(opts))); |
| } |
| const vector<string> base_args = { |
| "perf", "loadgen", |
| cluster_->master()->bound_rpc_addr().ToString(), |
| // Use a number of threads that isn't a divisor of the number of partitions |
| // so the insertion bounds of the threads don't align with the bounds of |
| // the partitions. This isn't necessary for the correctness of this test, |
| // but is a bit more unusual and, thus, worth testing. See the comments in |
| // tools/tool_action_perf.cc for more details about this partitioning. |
| "--num_threads=3", |
| |
| // Keep the tables so we can verify the presence of tablets as we go. |
| "--keep_auto_table=true", |
| |
| // Let's make sure nothing breaks even if we insert across the entire |
| // keyspace. If we didn't use `use_random`, the bounds of inserted data |
| // would be limited by the number of rows inserted. |
| "--use_random", |
| |
| // Let's also make sure we get the correct results. |
| "--run_scan", |
| }; |
| |
| const MonoDelta kTimeout = MonoDelta::FromMilliseconds(10); |
| TServerDetails* ts = ts_map_[cluster_->tablet_server(0)->uuid()]; |
| |
| // Test with misconfigured partitioning. This should fail because we disallow |
| // creating tables with "no" partitioning. |
| vector<string> args(base_args); |
| args.emplace_back("--table_num_range_partitions=1"); |
| args.emplace_back("--table_num_hash_partitions=1"); |
| Status s = RunKuduTool(args); |
| ASSERT_FALSE(s.ok()); |
| |
| // Now let's try running with a couple range partitions. |
| args = base_args; |
| args.emplace_back("--table_num_range_partitions=2"); |
| args.emplace_back("--table_num_hash_partitions=1"); |
| int expected_tablets = 2; |
| ASSERT_OK(RunKuduTool(args)); |
| vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets; |
| ASSERT_OK(WaitForNumTabletsOnTS(ts, expected_tablets, kTimeout)); |
| |
| // Now let's try running with only hash partitions. |
| args = base_args; |
| args.emplace_back("--table_num_range_partitions=1"); |
| args.emplace_back("--table_num_hash_partitions=2"); |
| ASSERT_OK(RunKuduTool(args)); |
| // Note: we're not deleting the tables as we go so that we can do this check. |
| // That also means that we have to take into account the previous tables |
| // created during this test. |
| expected_tablets += 2; |
| ASSERT_OK(WaitForNumTabletsOnTS(ts, expected_tablets, kTimeout)); |
| |
| // And now with both. |
| args = base_args; |
| args.emplace_back("--table_num_range_partitions=2"); |
| args.emplace_back("--table_num_hash_partitions=2"); |
| expected_tablets += 4; |
| ASSERT_OK(RunKuduTool(args)); |
| ASSERT_OK(WaitForNumTabletsOnTS(ts, expected_tablets, kTimeout)); |
| } |
| |
| // Test that a non-random workload results in the behavior we would expect when |
| // running against an auto-generated range partitioned table. |
| TEST_F(ToolTest, TestNonRandomWorkloadLoadgen) { |
| { |
| ExternalMiniClusterOptions opts; |
| opts.num_tablet_servers = 1; |
| // Flush frequently so there are bloom files to check. |
| // |
| // Note: we use the number of bloom lookups as a loose indicator of whether |
| // writes are sequential or not. If row A is being inserted to a range of |
| // the keyspace that has already been inserted to, the interval tree that |
| // backs the tablet will be unable to say with certainty that row A does or |
| // doesn't already exist, necessitating a bloom lookup. As such, if there |
| // are bloom lookups for a tablet for a given workload, we can say that |
| // that workload is not sequential. |
| opts.extra_tserver_flags = { |
| "--flush_threshold_mb=1", |
| "--flush_threshold_secs=1", |
| }; |
| NO_FATALS(StartExternalMiniCluster(std::move(opts))); |
| } |
| const vector<string> base_args = { |
| "perf", "loadgen", |
| cluster_->master()->bound_rpc_addr().ToString(), |
| "--keep_auto_table", |
| |
| // Use the same number of threads as partitions so when we range partition, |
| // each thread will be writing to a single tablet. |
| "--num_threads=4", |
| |
| // Insert a bunch of large rows for us to begin flushing so there are bloom |
| // files to check. |
| "--num_rows_per_thread=10000", |
| "--string_len=32768", |
| |
| // Since we're using such large payloads, flush more frequently so the |
| // client doesn't run out of memory. |
| "--flush_per_n_rows=1", |
| }; |
| |
| // Partition the table so each thread inserts to a single range. |
| vector<string> args = base_args; |
| args.emplace_back("--table_num_range_partitions=4"); |
| args.emplace_back("--table_num_hash_partitions=1"); |
| ASSERT_OK(RunKuduTool(args)); |
| |
| // Check that the insert workload didn't require any bloom lookups. |
| ExternalTabletServer* ts = cluster_->tablet_server(0); |
| int64_t bloom_lookups = 0; |
| ASSERT_OK(itest::GetInt64Metric(ts->bound_http_hostport(), |
| &METRIC_ENTITY_tablet, nullptr, &METRIC_bloom_lookups, "value", &bloom_lookups)); |
| ASSERT_EQ(0, bloom_lookups); |
| } |
| |
| // Test 'kudu remote_replica copy' tool when the destination tablet server is online. |
| // 1. Test the copy tool when the destination replica is healthy |
| // 2. Test the copy tool when the destination replica is tombstoned |
| // 3. Test the copy tool when the destination replica is deleted |
| TEST_F(ToolTest, TestRemoteReplicaCopy) { |
| const string kTestDir = GetTestPath("test"); |
| MonoDelta kTimeout = MonoDelta::FromSeconds(30); |
| const int kSrcTsIndex = 0; |
| const int kDstTsIndex = 1; |
| const int kNumTservers = 3; |
| const int kNumTablets = 3; |
| // This lets us specify wildcard ip addresses for rpc servers |
| // on the tablet servers in the cluster giving us the test coverage |
| // for KUDU-1776. With this, 'kudu remote_replica copy' can be used to |
| // connect to tablet servers bound to wildcard ip addresses. |
| ExternalMiniClusterOptions opts; |
| opts.num_tablet_servers = kNumTservers; |
| opts.bind_mode = cluster::MiniCluster::WILDCARD; |
| opts.extra_master_flags.emplace_back( |
| "--catalog_manager_wait_for_new_tablets_to_elect_leader=false"); |
| opts.extra_tserver_flags.emplace_back("--enable_leader_failure_detection=false"); |
| NO_FATALS(StartExternalMiniCluster(std::move(opts))); |
| |
| // TestWorkLoad.Setup() internally generates a table. |
| TestWorkload workload(cluster_.get()); |
| workload.set_num_tablets(kNumTablets); |
| workload.set_num_replicas(3); |
| workload.Setup(); |
| |
| // Choose source and destination tablet servers for tablet_copy. |
| vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets; |
| TServerDetails* src_ts = ts_map_[cluster_->tablet_server(kSrcTsIndex)->uuid()]; |
| ASSERT_OK(WaitForNumTabletsOnTS(src_ts, kNumTablets, kTimeout, &tablets)); |
| TServerDetails* dst_ts = ts_map_[cluster_->tablet_server(kDstTsIndex)->uuid()]; |
| ASSERT_OK(WaitForNumTabletsOnTS(dst_ts, kNumTablets, kTimeout, &tablets)); |
| const string& healthy_tablet_id = tablets[0].tablet_status().tablet_id(); |
| |
| // Wait until the tablets are RUNNING before we start any copies. |
| ASSERT_OK(WaitUntilTabletInState(src_ts, healthy_tablet_id, tablet::RUNNING, kTimeout)); |
| ASSERT_OK(WaitUntilTabletInState(dst_ts, healthy_tablet_id, tablet::RUNNING, kTimeout)); |
| |
| // Test 1: Test when the destination replica is healthy with and without --force_copy flag. |
| // This is an 'online tablet copy'. i.e, when the tool initiates a copy, |
| // the internal machinery of tablet-copy deletes the existing healthy |
| // replica on destination and copies the replica if --force_copy is specified. |
| // Without --force_copy flag, the test fails to copy since there is a healthy |
| // replica already present on the destination tserver. |
| string stderr; |
| const string& src_ts_addr = cluster_->tablet_server(kSrcTsIndex)->bound_rpc_addr().ToString(); |
| const string& dst_ts_addr = cluster_->tablet_server(kDstTsIndex)->bound_rpc_addr().ToString(); |
| Status s = RunTool( |
| Substitute("remote_replica copy $0 $1 $2", |
| healthy_tablet_id, src_ts_addr, dst_ts_addr), |
| nullptr, &stderr, nullptr, nullptr); |
| ASSERT_TRUE(s.IsRuntimeError()); |
| SCOPED_TRACE(stderr); |
| ASSERT_STR_CONTAINS(stderr, "Rejecting tablet copy request"); |
| |
| NO_FATALS(RunActionStdoutNone(Substitute("remote_replica copy $0 $1 $2 --force_copy", |
| healthy_tablet_id, src_ts_addr, dst_ts_addr))); |
| ASSERT_OK(WaitUntilTabletInState(dst_ts, healthy_tablet_id, |
| tablet::RUNNING, kTimeout)); |
| |
| // Test 2 and 3: Test when the destination replica is tombstoned or deleted |
| DeleteTabletRequestPB req; |
| DeleteTabletResponsePB resp; |
| RpcController rpc; |
| rpc.set_timeout(kTimeout); |
| req.set_dest_uuid(dst_ts->uuid()); |
| const string& tombstoned_tablet_id = tablets[2].tablet_status().tablet_id(); |
| req.set_tablet_id(tombstoned_tablet_id); |
| req.set_delete_type(TabletDataState::TABLET_DATA_TOMBSTONED); |
| ASSERT_OK(dst_ts->tserver_admin_proxy->DeleteTablet(req, &resp, &rpc)); |
| ASSERT_FALSE(resp.has_error()); |
| |
| // Shut down the destination server and delete one tablet from |
| // local fs on destination tserver while it is offline. |
| cluster_->tablet_server(kDstTsIndex)->Shutdown(); |
| const string& deleted_tablet_id = tablets[1].tablet_status().tablet_id(); |
| NO_FATALS(RunActionStdoutNone(Substitute( |
| "local_replica delete $0 --fs_wal_dir=$1 --fs_data_dirs=$2 --clean_unsafe", |
| deleted_tablet_id, cluster_->tablet_server(kDstTsIndex)->wal_dir(), |
| JoinStrings(cluster_->tablet_server(kDstTsIndex)->data_dirs(), ",")))); |
| |
| // At this point, we expect only 2 tablets to show up on destination when |
| // we restart the destination tserver. deleted_tablet_id should not be found on |
| // destination tserver until we do a copy from the tool again. |
| ASSERT_OK(cluster_->tablet_server(kDstTsIndex)->Restart()); |
| vector<ListTabletsResponsePB::StatusAndSchemaPB> dst_tablets; |
| ASSERT_OK(WaitForNumTabletsOnTS(dst_ts, kNumTablets-1, kTimeout, &dst_tablets)); |
| bool found_tombstoned_tablet = false; |
| for (const auto& t : dst_tablets) { |
| if (t.tablet_status().tablet_id() == tombstoned_tablet_id) { |
| found_tombstoned_tablet = true; |
| } |
| ASSERT_NE(t.tablet_status().tablet_id(), deleted_tablet_id); |
| } |
| ASSERT_TRUE(found_tombstoned_tablet); |
| // Wait until destination tserver has tombstoned_tablet_id in tombstoned state. |
| NO_FATALS(inspect_->WaitForTabletDataStateOnTS(kDstTsIndex, tombstoned_tablet_id, |
| { TabletDataState::TABLET_DATA_TOMBSTONED }, |
| kTimeout)); |
| // Copy tombstoned_tablet_id from source to destination. |
| NO_FATALS(RunActionStdoutNone(Substitute("remote_replica copy $0 $1 $2 --force_copy", |
| tombstoned_tablet_id, src_ts_addr, dst_ts_addr))); |
| ASSERT_OK(WaitUntilTabletInState(dst_ts, tombstoned_tablet_id, |
| tablet::RUNNING, kTimeout)); |
| // Copy deleted_tablet_id from source to destination. |
| NO_FATALS(RunActionStdoutNone(Substitute("remote_replica copy $0 $1 $2", |
| deleted_tablet_id, src_ts_addr, dst_ts_addr))); |
| ASSERT_OK(WaitUntilTabletInState(dst_ts, deleted_tablet_id, |
| tablet::RUNNING, kTimeout)); |
| } |
| |
| // Test 'kudu local_replica delete' tool with --clean_unsafe flag for |
| // deleting the tablet from the tablet server. |
| TEST_F(ToolTest, TestLocalReplicaDelete) { |
| NO_FATALS(StartMiniCluster()); |
| |
| // TestWorkLoad.Setup() internally generates a table. |
| TestWorkload workload(mini_cluster_.get()); |
| workload.set_num_replicas(1); |
| workload.Setup(); |
| workload.Start(); |
| |
| // There is only one tserver in the minicluster. |
| ASSERT_OK(mini_cluster_->WaitForTabletServerCount(1)); |
| MiniTabletServer* ts = mini_cluster_->mini_tablet_server(0); |
| |
| // Generate some workload followed by a flush to grow the size of the tablet on disk. |
| while (workload.rows_inserted() < 10000) { |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| } |
| workload.StopAndJoin(); |
| |
| // Make sure the tablet data is flushed to disk. This is needed |
| // so that we can compare the size of the data on disk before and |
| // after the deletion of local_replica to check if the size-on-disk |
| // is reduced at all. |
| string tablet_id; |
| { |
| vector<scoped_refptr<TabletReplica>> tablet_replicas; |
| ts->server()->tablet_manager()->GetTabletReplicas(&tablet_replicas); |
| ASSERT_EQ(1, tablet_replicas.size()); |
| Tablet* tablet = tablet_replicas[0]->tablet(); |
| ASSERT_OK(tablet->Flush()); |
| tablet_id = tablet_replicas[0]->tablet_id(); |
| } |
| const string& tserver_dir = ts->options()->fs_opts.wal_root; |
| // Using the delete tool with tablet server running fails. |
| string stderr; |
| Status s = RunTool( |
| Substitute("local_replica delete $0 --fs_wal_dir=$1 --fs_data_dirs=$1 " |
| "--clean_unsafe", tablet_id, tserver_dir), |
| nullptr, &stderr, nullptr, nullptr); |
| ASSERT_TRUE(s.IsRuntimeError()); |
| SCOPED_TRACE(stderr); |
| ASSERT_STR_CONTAINS(stderr, "Resource temporarily unavailable"); |
| |
| // Shut down tablet server and use the delete tool with --clean_unsafe. |
| ts->Shutdown(); |
| |
| const string& data_dir = JoinPathSegments(tserver_dir, "data"); |
| uint64_t size_before_delete; |
| ASSERT_OK(env_->GetFileSizeOnDiskRecursively(data_dir, &size_before_delete)); |
| NO_FATALS(RunActionStdoutNone(Substitute("local_replica delete $0 --fs_wal_dir=$1 " |
| "--fs_data_dirs=$1 --clean_unsafe", |
| tablet_id, tserver_dir))); |
| // Verify metadata and WAL segments for the tablet_id are gone. |
| const string& wal_dir = JoinPathSegments(tserver_dir, |
| Substitute("wals/$0", tablet_id)); |
| ASSERT_FALSE(env_->FileExists(wal_dir)); |
| const string& meta_dir = JoinPathSegments(tserver_dir, |
| Substitute("tablet-meta/$0", tablet_id)); |
| ASSERT_FALSE(env_->FileExists(meta_dir)); |
| |
| // Verify that the total size of the data on disk after 'delete' action |
| // is less than before. Although this doesn't necessarily check |
| // for orphan data blocks left behind for the given tablet, it certainly |
| // indicates that some data has been deleted from disk. |
| uint64_t size_after_delete; |
| ASSERT_OK(env_->GetFileSizeOnDiskRecursively(data_dir, &size_after_delete)); |
| ASSERT_LT(size_after_delete, size_before_delete); |
| |
| // Since there was only one tablet on the node which was deleted by tool, |
| // we can expect the tablet server to have nothing after it comes up. |
| ASSERT_OK(ts->Start()); |
| ASSERT_OK(ts->WaitStarted()); |
| vector<scoped_refptr<TabletReplica>> tablet_replicas; |
| ts->server()->tablet_manager()->GetTabletReplicas(&tablet_replicas); |
| ASSERT_EQ(0, tablet_replicas.size()); |
| } |
| |
| // Test 'kudu local_replica delete' tool for tombstoning the tablet. |
| TEST_F(ToolTest, TestLocalReplicaTombstoneDelete) { |
| NO_FATALS(StartMiniCluster()); |
| |
| // TestWorkLoad.Setup() internally generates a table. |
| TestWorkload workload(mini_cluster_.get()); |
| workload.set_num_replicas(1); |
| workload.Setup(); |
| workload.Start(); |
| |
| // There is only one tserver in the minicluster. |
| ASSERT_OK(mini_cluster_->WaitForTabletServerCount(1)); |
| MiniTabletServer* ts = mini_cluster_->mini_tablet_server(0); |
| |
| // Generate some workload followed by a flush to grow the size of the tablet on disk. |
| while (workload.rows_inserted() < 10000) { |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| } |
| workload.StopAndJoin(); |
| |
| // Make sure the tablet data is flushed to disk. This is needed |
| // so that we can compare the size of the data on disk before and |
| // after the deletion of local_replica to verify that the size-on-disk |
| // is reduced after the tool operation. |
| optional<OpId> last_logged_opid; |
| string tablet_id; |
| { |
| vector<scoped_refptr<TabletReplica>> tablet_replicas; |
| ts->server()->tablet_manager()->GetTabletReplicas(&tablet_replicas); |
| ASSERT_EQ(1, tablet_replicas.size()); |
| tablet_id = tablet_replicas[0]->tablet_id(); |
| last_logged_opid = tablet_replicas[0]->shared_consensus()->GetLastOpId(RECEIVED_OPID); |
| Tablet* tablet = tablet_replicas[0]->tablet(); |
| ASSERT_OK(tablet->Flush()); |
| } |
| const string& tserver_dir = ts->options()->fs_opts.wal_root; |
| |
| // Shut down tablet server and use the delete tool. |
| ts->Shutdown(); |
| const string& data_dir = JoinPathSegments(tserver_dir, "data"); |
| uint64_t size_before_delete; |
| ASSERT_OK(env_->GetFileSizeOnDiskRecursively(data_dir, &size_before_delete)); |
| NO_FATALS(RunActionStdoutNone(Substitute("local_replica delete $0 --fs_wal_dir=$1 " |
| "--fs_data_dirs=$1", |
| tablet_id, tserver_dir))); |
| // Verify WAL segments for the tablet_id are gone and |
| // the data_dir size on tserver is reduced. |
| const string& wal_dir = JoinPathSegments(tserver_dir, |
| Substitute("wals/$0", tablet_id)); |
| ASSERT_FALSE(env_->FileExists(wal_dir)); |
| uint64_t size_after_delete; |
| ASSERT_OK(env_->GetFileSizeOnDiskRecursively(data_dir, &size_after_delete)); |
| ASSERT_LT(size_after_delete, size_before_delete); |
| |
| // Bring up the tablet server and verify that tablet is tombstoned and |
| // tombstone_last_logged_opid matches with the one test had cached earlier. |
| ASSERT_OK(ts->Start()); |
| ASSERT_OK(ts->WaitStarted()); |
| { |
| vector<scoped_refptr<TabletReplica>> tablet_replicas; |
| ts->server()->tablet_manager()->GetTabletReplicas(&tablet_replicas); |
| ASSERT_EQ(1, tablet_replicas.size()); |
| ASSERT_EQ(tablet_id, tablet_replicas[0]->tablet_id()); |
| ASSERT_EQ(TabletDataState::TABLET_DATA_TOMBSTONED, |
| tablet_replicas[0]->tablet_metadata()->tablet_data_state()); |
| optional<OpId> tombstoned_opid = |
| tablet_replicas[0]->tablet_metadata()->tombstone_last_logged_opid(); |
| ASSERT_NE(boost::none, tombstoned_opid); |
| ASSERT_NE(boost::none, last_logged_opid); |
| ASSERT_EQ(last_logged_opid->term(), tombstoned_opid->term()); |
| ASSERT_EQ(last_logged_opid->index(), tombstoned_opid->index()); |
| } |
| } |
| |
| // Test for 'local_replica cmeta' functionality. |
| TEST_F(ToolTest, TestLocalReplicaCMetaOps) { |
| NO_FATALS(StartMiniCluster()); |
| |
| // TestWorkLoad.Setup() internally generates a table. |
| TestWorkload workload(mini_cluster_.get()); |
| workload.set_num_replicas(1); |
| workload.Setup(); |
| MiniTabletServer* ts = mini_cluster_->mini_tablet_server(0); |
| const string ts_uuid = ts->uuid(); |
| const string& flags = Substitute("-fs-wal-dir $0", ts->options()->fs_opts.wal_root); |
| string tablet_id; |
| { |
| vector<string> tablets; |
| NO_FATALS(RunActionStdoutLines(Substitute("local_replica list $0", flags), &tablets)); |
| ASSERT_EQ(1, tablets.size()); |
| tablet_id = tablets[0]; |
| } |
| const auto& cmeta_path = ts->server()->fs_manager()->GetConsensusMetadataPath(tablet_id); |
| |
| ts->Shutdown(); |
| |
| // Test print_replica_uuids. |
| // We only have a single replica, so we expect one line, with our server's UUID. |
| { |
| vector<string> uuids; |
| NO_FATALS(RunActionStdoutLines(Substitute("local_replica cmeta print_replica_uuids $0 $1", |
| flags, tablet_id), &uuids)); |
| ASSERT_EQ(1, uuids.size()); |
| EXPECT_EQ(ts_uuid, uuids[0]); |
| } |
| |
| // Test using set-term to bump the term to 123. |
| { |
| NO_FATALS(RunActionStdoutNone(Substitute("local_replica cmeta set-term $0 $1 123", |
| flags, tablet_id))); |
| |
| string stdout; |
| NO_FATALS(RunActionStdoutString(Substitute("pbc dump $0", cmeta_path), |
| &stdout)); |
| ASSERT_STR_CONTAINS(stdout, "current_term: 123"); |
| } |
| |
| // Test that set-term refuses to decrease the term. |
| { |
| string stdout, stderr; |
| Status s = RunTool(Substitute("local_replica cmeta set-term $0 $1 10", |
| flags, tablet_id), |
| &stdout, &stderr, |
| /* stdout_lines = */ nullptr, |
| /* stderr_lines = */ nullptr); |
| EXPECT_FALSE(s.ok()); |
| EXPECT_EQ("", stdout); |
| EXPECT_THAT(stderr, testing::HasSubstr( |
| "specified term 10 must be higher than current term 123")); |
| } |
| } |
| |
| TEST_F(ToolTest, TestTserverList) { |
| NO_FATALS(StartExternalMiniCluster()); |
| |
| string master_addr = cluster_->master()->bound_rpc_addr().ToString(); |
| const auto& tserver = cluster_->tablet_server(0); |
| |
| { // TSV |
| string out; |
| NO_FATALS(RunActionStdoutString(Substitute("tserver list $0 --columns=uuid --format=tsv", |
| master_addr), |
| &out)); |
| |
| ASSERT_EQ(tserver->uuid(), out); |
| } |
| |
| { // JSON |
| string out; |
| NO_FATALS(RunActionStdoutString( |
| Substitute("tserver list $0 --columns=uuid,rpc-addresses --format=json", master_addr), |
| &out)); |
| |
| ASSERT_EQ(Substitute("[{\"uuid\":\"$0\",\"rpc-addresses\":\"$1\"}]", |
| tserver->uuid(), tserver->bound_rpc_hostport().ToString()), |
| out); |
| } |
| |
| { // Pretty |
| string out; |
| NO_FATALS(RunActionStdoutString( |
| Substitute("tserver list $0 --columns=uuid,rpc-addresses", master_addr), |
| &out)); |
| |
| ASSERT_STR_CONTAINS(out, tserver->uuid()); |
| ASSERT_STR_CONTAINS(out, tserver->bound_rpc_hostport().ToString()); |
| } |
| |
| { // Add a tserver |
| ASSERT_OK(cluster_->AddTabletServer()); |
| |
| vector<string> lines; |
| NO_FATALS(RunActionStdoutLines( |
| Substitute("tserver list $0 --columns=uuid --format=space", master_addr), |
| &lines)); |
| |
| vector<string> expected = { |
| tserver->uuid(), |
| cluster_->tablet_server(1)->uuid(), |
| }; |
| |
| std::sort(lines.begin(), lines.end()); |
| std::sort(expected.begin(), expected.end()); |
| |
| ASSERT_EQ(expected, lines); |
| } |
| } |
| |
| TEST_F(ToolTest, TestTserverListLocationAssigned) { |
| const string kLocationCmdPath = JoinPathSegments(GetTestExecutableDirectory(), |
| "testdata/first_argument.sh"); |
| const string location = "/foo-bar0/BAAZ._9-quux"; |
| ExternalMiniClusterOptions opts; |
| opts.extra_master_flags.emplace_back( |
| Substitute("--location_mapping_cmd=$0 $1", kLocationCmdPath, location)); |
| |
| NO_FATALS(StartExternalMiniCluster(opts)); |
| |
| string master_addr = cluster_->master()->bound_rpc_addr().ToString(); |
| |
| string out; |
| NO_FATALS(RunActionStdoutString( |
| Substitute("tserver list $0 --columns=uuid,location --format=csv", master_addr), &out)); |
| |
| ASSERT_STR_CONTAINS(out, cluster_->tablet_server(0)->uuid() + "," + location); |
| } |
| |
| TEST_F(ToolTest, TestTserverListLocationNotAssigned) { |
| NO_FATALS(StartExternalMiniCluster()); |
| |
| string master_addr = cluster_->master()->bound_rpc_addr().ToString(); |
| |
| string out; |
| NO_FATALS(RunActionStdoutString( |
| Substitute("tserver list $0 --columns=uuid,location --format=csv", master_addr), &out)); |
| |
| ASSERT_STR_CONTAINS(out, cluster_->tablet_server(0)->uuid() + ",<none>"); |
| } |
| |
| TEST_F(ToolTest, TestMasterList) { |
| ExternalMiniClusterOptions opts; |
| opts.num_tablet_servers = 0; |
| NO_FATALS(StartExternalMiniCluster(std::move(opts))); |
| |
| string master_addr = cluster_->master()->bound_rpc_addr().ToString(); |
| auto* master = cluster_->master(); |
| |
| string out; |
| NO_FATALS(RunActionStdoutString( |
| Substitute("master list $0 --columns=uuid,rpc-addresses", master_addr), |
| &out)); |
| |
| ASSERT_STR_CONTAINS(out, master->uuid()); |
| ASSERT_STR_CONTAINS(out, master->bound_rpc_hostport().ToString()); |
| } |
| |
| // Operate on Kudu tables: |
| // (1)delete a table |
| // (2)rename a table |
| // (3)rename a column |
| // (4)list tables |
| TEST_F(ToolTest, TestDeleteTable) { |
| NO_FATALS(StartExternalMiniCluster()); |
| shared_ptr<KuduClient> client; |
| ASSERT_OK(cluster_->CreateClient(nullptr, &client)); |
| string master_addr = cluster_->master()->bound_rpc_addr().ToString(); |
| |
| const string& kTableName = "kudu.table"; |
| |
| // Create a table. |
| TestWorkload workload(cluster_.get()); |
| workload.set_table_name(kTableName); |
| workload.set_num_replicas(1); |
| workload.Setup(); |
| |
| // Check that the table exists. |
| bool exist = false; |
| ASSERT_OK(client->TableExists(kTableName, &exist)); |
| ASSERT_EQ(exist, true); |
| |
| // Delete the table. |
| NO_FATALS(RunActionStdoutNone(Substitute("table delete $0 $1 --nomodify_external_catalogs", |
| master_addr, kTableName))); |
| |
| // Check that the table does not exist. |
| ASSERT_OK(client->TableExists(kTableName, &exist)); |
| ASSERT_EQ(exist, false); |
| } |
| |
| TEST_F(ToolTest, TestRenameTable) { |
| NO_FATALS(StartExternalMiniCluster()); |
| shared_ptr<KuduClient> client; |
| ASSERT_OK(cluster_->CreateClient(nullptr, &client)); |
| string master_addr = cluster_->master()->bound_rpc_addr().ToString(); |
| |
| const string& kTableName = "kudu.table"; |
| const string& kNewTableName = "kudu_table"; |
| |
| // Create the table. |
| TestWorkload workload(cluster_.get()); |
| workload.set_table_name(kTableName); |
| workload.set_num_replicas(1); |
| workload.Setup(); |
| |
| string out; |
| NO_FATALS(RunActionStdoutNone(Substitute("table rename_table $0 $1 $2", |
| master_addr, kTableName, |
| kNewTableName))); |
| shared_ptr<KuduTable> table; |
| ASSERT_OK(client->OpenTable(kNewTableName, &table)); |
| |
| NO_FATALS(RunActionStdoutNone( |
| Substitute("table rename_table $0 $1 $2 --nomodify_external_catalogs", |
| master_addr, kNewTableName, kTableName))); |
| ASSERT_OK(client->OpenTable(kTableName, &table)); |
| } |
| |
| TEST_F(ToolTest, TestRenameColumn) { |
| NO_FATALS(StartExternalMiniCluster()); |
| const string& kTableName = "table"; |
| const string& kColumnName = "col.0"; |
| const string& kNewColumnName = "col_0"; |
| |
| KuduSchemaBuilder schema_builder; |
| schema_builder.AddColumn("key") |
| ->Type(client::KuduColumnSchema::INT32) |
| ->NotNull() |
| ->PrimaryKey(); |
| schema_builder.AddColumn(kColumnName) |
| ->Type(client::KuduColumnSchema::INT32) |
| ->NotNull(); |
| KuduSchema schema; |
| ASSERT_OK(schema_builder.Build(&schema)); |
| |
| // Create the table. |
| TestWorkload workload(cluster_.get()); |
| workload.set_table_name(kTableName); |
| workload.set_schema(schema); |
| workload.set_num_replicas(1); |
| workload.Setup(); |
| |
| string master_addr = cluster_->master()->bound_rpc_addr().ToString(); |
| string out; |
| NO_FATALS(RunActionStdoutNone(Substitute("table rename_column $0 $1 $2 $3", |
| master_addr, kTableName, |
| kColumnName, kNewColumnName))); |
| shared_ptr<KuduClient> client; |
| ASSERT_OK(KuduClientBuilder() |
| .add_master_server_addr(master_addr) |
| .Build(&client)); |
| shared_ptr<KuduTable> table; |
| ASSERT_OK(client->OpenTable(kTableName, &table)); |
| ASSERT_STR_CONTAINS(table->schema().ToString(), kNewColumnName); |
| } |
| |
| TEST_F(ToolTest, TestListTables) { |
| NO_FATALS(StartExternalMiniCluster()); |
| shared_ptr<KuduClient> client; |
| ASSERT_OK(cluster_->CreateClient(nullptr, &client)); |
| string master_addr = cluster_->master()->bound_rpc_addr().ToString(); |
| |
| // Replica's format. |
| string ts_uuid = cluster_->tablet_server(0)->uuid(); |
| string ts_addr = cluster_->tablet_server(0)->bound_rpc_addr().ToString(); |
| string expect_replica = Substitute(" L $0 $1", ts_uuid, ts_addr); |
| |
| // Create some tables. |
| const int kNumTables = 10; |
| vector<string> table_names; |
| for (int i = 0; i < kNumTables; ++i) { |
| string table_name = Substitute("kudu.table_$0", i); |
| table_names.push_back(table_name); |
| |
| TestWorkload workload(cluster_.get()); |
| workload.set_table_name(table_name); |
| workload.set_num_replicas(1); |
| workload.Setup(); |
| } |
| std::sort(table_names.begin(), table_names.end()); |
| |
| const auto& ProcessTables = [&] (const int num) { |
| ASSERT_GE(num, 1); |
| ASSERT_LE(num, kNumTables); |
| |
| vector<string> expected; |
| expected.insert(expected.end(), table_names.begin(), table_names.begin() + num); |
| |
| string filter = ""; |
| if (kNumTables != num) { |
| filter = Substitute("-tables=$0", JoinStrings(expected, ",")); |
| } |
| vector<string> lines; |
| NO_FATALS(RunActionStdoutLines( |
| Substitute("table list $0 $1", master_addr, filter), &lines)); |
| |
| std::sort(lines.begin(), lines.end()); |
| ASSERT_EQ(expected, lines); |
| }; |
| |
| const auto& ProcessTablets = [&] (const int num) { |
| ASSERT_GE(num, 1); |
| ASSERT_LE(num, kNumTables); |
| |
| string filter = ""; |
| if (kNumTables != num) { |
| filter = Substitute("-tables=$0", |
| JoinStringsIterator(table_names.begin(), table_names.begin() + num, ",")); |
| } |
| vector<string> lines; |
| NO_FATALS(RunActionStdoutLines( |
| Substitute("table list $0 $1 -list_tablets", master_addr, filter), &lines)); |
| |
| map<string, pair<string, string>> output; |
| for (int i = 0; i < lines.size(); ++i) { |
| if (lines[i].empty()) continue; |
| ASSERT_LE(i + 2, lines.size()); |
| output[lines[i]] = pair<string, string>(lines[i + 1], lines[i + 2]); |
| i += 2; |
| } |
| |
| for (const auto& e : output) { |
| shared_ptr<KuduTable> table; |
| ASSERT_OK(client->OpenTable(e.first, &table)); |
| vector<KuduScanToken*> tokens; |
| ElementDeleter deleter(&tokens); |
| KuduScanTokenBuilder builder(table.get()); |
| ASSERT_OK(builder.Build(&tokens)); |
| ASSERT_EQ(1, tokens.size()); // Only one partition(tablet) under table. |
| // Tablet's format. |
| string expect_tablet = Substitute(" T $0", tokens[0]->tablet().id()); |
| ASSERT_EQ(expect_tablet, e.second.first); |
| ASSERT_EQ(expect_replica, e.second.second); |
| } |
| }; |
| |
| // List the tables and tablets. |
| for (int i = 1; i <= kNumTables; ++i) { |
| ProcessTables(i); |
| ProcessTablets(i); |
| } |
| } |
| |
| Status CreateLegacyHmsTable(HmsClient* client, |
| const string& hms_database_name, |
| const string& hms_table_name, |
| const string& kudu_table_name, |
| const string& kudu_master_addrs, |
| const string& table_type, |
| const optional<const string&>& owner) { |
| hive::Table table; |
| table.dbName = hms_database_name; |
| table.tableName = hms_table_name; |
| table.tableType = table_type; |
| if (owner) { |
| table.owner = *owner; |
| } |
| |
| table.__set_parameters({ |
| make_pair(HmsClient::kStorageHandlerKey, HmsClient::kLegacyKuduStorageHandler), |
| make_pair(HmsClient::kLegacyKuduTableNameKey, kudu_table_name), |
| make_pair(HmsClient::kKuduMasterAddrsKey, kudu_master_addrs), |
| }); |
| |
| // TODO(Hao): Remove this once HIVE-19253 is fixed. |
| if (table_type == HmsClient::kExternalTable) { |
| table.parameters[HmsClient::kExternalTableKey] = "TRUE"; |
| } |
| |
| return client->CreateTable(table); |
| } |
| |
| Status CreateHmsTable(HmsClient* client, |
| const string& database_name, |
| const string& table_name, |
| const string& table_type, |
| const string& master_addresses, |
| const string& table_id) { |
| hive::Table table; |
| table.dbName = database_name; |
| table.tableName = table_name; |
| table.tableType = table_type; |
| |
| table.__set_parameters({ |
| make_pair(HmsClient::kStorageHandlerKey, HmsClient::kKuduStorageHandler), |
| make_pair(HmsClient::kKuduTableIdKey, table_id), |
| make_pair(HmsClient::kKuduMasterAddrsKey, master_addresses), |
| }); |
| |
| // TODO(Hao): Remove this once HIVE-19253 is fixed. |
| if (table_type == HmsClient::kExternalTable) { |
| table.parameters[HmsClient::kExternalTableKey] = "TRUE"; |
| } |
| |
| hive::EnvironmentContext env_ctx; |
| env_ctx.__set_properties({ make_pair(HmsClient::kKuduMasterEventKey, "true") }); |
| return client->CreateTable(table, env_ctx); |
| } |
| |
| Status CreateKuduTable(const shared_ptr<KuduClient>& kudu_client, |
| const string& table_name) { |
| KuduSchemaBuilder schema_builder; |
| schema_builder.AddColumn("foo") |
| ->Type(client::KuduColumnSchema::INT32) |
| ->NotNull() |
| ->PrimaryKey(); |
| KuduSchema schema; |
| RETURN_NOT_OK(schema_builder.Build(&schema)); |
| unique_ptr<client::KuduTableCreator> table_creator(kudu_client->NewTableCreator()); |
| return table_creator->table_name(table_name) |
| .schema(&schema) |
| .num_replicas(1) |
| .set_range_partition_columns({ "foo" }) |
| .Create(); |
| } |
| |
| bool IsValidTableName(const string& table_name) { |
| vector<string> identifiers = strings::Split(table_name, "."); |
| return identifiers.size() ==2 && |
| !HasPrefixString(table_name, HmsClient::kLegacyTablePrefix); |
| } |
| |
| void ValidateHmsEntries(HmsClient* hms_client, |
| const shared_ptr<KuduClient>& kudu_client, |
| const string& database_name, |
| const string& table_name, |
| const string& master_addr) { |
| hive::Table hms_table; |
| ASSERT_OK(hms_client->GetTable(database_name, table_name, &hms_table)); |
| shared_ptr<KuduTable> kudu_table; |
| ASSERT_OK(kudu_client->OpenTable(Substitute("$0.$1", database_name, table_name), &kudu_table)); |
| ASSERT_EQ(hms_table.parameters[HmsClient::kStorageHandlerKey], HmsClient::kKuduStorageHandler); |
| ASSERT_EQ(hms_table.parameters[HmsClient::kKuduTableIdKey], kudu_table->id()); |
| ASSERT_EQ(hms_table.parameters[HmsClient::kKuduMasterAddrsKey], master_addr); |
| ASSERT_TRUE(!ContainsKey(hms_table.parameters, HmsClient::kLegacyKuduTableNameKey)); |
| } |
| |
| TEST_P(ToolTestKerberosParameterized, TestHmsDowngrade) { |
| ExternalMiniClusterOptions opts; |
| opts.hms_mode = HmsMode::ENABLE_METASTORE_INTEGRATION; |
| opts.enable_kerberos = EnableKerberos(); |
| NO_FATALS(StartExternalMiniCluster(std::move(opts))); |
| |
| string master_addr = cluster_->master()->bound_rpc_addr().ToString(); |
| thrift::ClientOptions hms_opts; |
| hms_opts.enable_kerberos = EnableKerberos(); |
| hms_opts.service_principal = "hive"; |
| HmsClient hms_client(cluster_->hms()->address(), hms_opts); |
| ASSERT_OK(hms_client.Start()); |
| ASSERT_TRUE(hms_client.IsConnected()); |
| shared_ptr<KuduClient> kudu_client; |
| ASSERT_OK(cluster_->CreateClient(nullptr, &kudu_client)); |
| |
| ASSERT_OK(CreateKuduTable(kudu_client, "default.a")); |
| NO_FATALS(ValidateHmsEntries(&hms_client, kudu_client, "default", "a", master_addr)); |
| |
| // Downgrade to legacy table in both Hive Metastore and Kudu. |
| // --hive_metastore_uris and --hive_metastore_sasl_enabled are automatically |
| // looked up from the master. |
| NO_FATALS(RunActionStdoutNone(Substitute("hms downgrade $0", master_addr))); |
| |
| // The check tool should report the legacy table. |
| string out; |
| string err; |
| Status s = RunActionStdoutStderrString(Substitute("hms check $0", master_addr), &out, &err); |
| ASSERT_FALSE(s.ok()); |
| ASSERT_STR_CONTAINS(out, hms::HmsClient::kLegacyKuduStorageHandler); |
| ASSERT_STR_CONTAINS(out, "default.a"); |
| |
| // The table should still be accessible in both Kudu and the HMS. |
| shared_ptr<KuduTable> kudu_table; |
| ASSERT_OK(kudu_client->OpenTable("default.a", &kudu_table)); |
| hive::Table hms_table; |
| ASSERT_OK(hms_client.GetTable("default", "a", &hms_table)); |
| |
| // Check that re-upgrading works as expected. |
| NO_FATALS(RunActionStdoutNone(Substitute("hms fix $0", master_addr))); |
| NO_FATALS(RunActionStdoutNone(Substitute("hms check $0", master_addr))); |
| } |
| |
| // Test HMS inconsistencies that can be automatically fixed. |
| // Kerberos is enabled in order to test the tools work in secure clusters. |
| TEST_P(ToolTestKerberosParameterized, TestCheckAndAutomaticFixHmsMetadata) { |
| string kUsername = "alice"; |
| ExternalMiniClusterOptions opts; |
| opts.hms_mode = HmsMode::ENABLE_HIVE_METASTORE; |
| opts.enable_kerberos = EnableKerberos(); |
| NO_FATALS(StartExternalMiniCluster(std::move(opts))); |
| |
| string master_addr = cluster_->master()->bound_rpc_addr().ToString(); |
| thrift::ClientOptions hms_opts; |
| hms_opts.enable_kerberos = EnableKerberos(); |
| hms_opts.service_principal = "hive"; |
| HmsClient hms_client(cluster_->hms()->address(), hms_opts); |
| ASSERT_OK(hms_client.Start()); |
| ASSERT_TRUE(hms_client.IsConnected()); |
| |
| FLAGS_hive_metastore_uris = cluster_->hms()->uris(); |
| FLAGS_hive_metastore_sasl_enabled = EnableKerberos(); |
| HmsCatalog hms_catalog(master_addr); |
| ASSERT_OK(hms_catalog.Start()); |
| |
| shared_ptr<KuduClient> kudu_client; |
| ASSERT_OK(cluster_->CreateClient(nullptr, &kudu_client)); |
| |
| // While the metastore integration is disabled create tables in Kudu and the |
| // HMS with inconsistent metadata. |
| |
| // Control case: the check tool should not flag this table. |
| shared_ptr<KuduTable> control; |
| ASSERT_OK(CreateKuduTable(kudu_client, "default.control")); |
| ASSERT_OK(kudu_client->OpenTable("default.control", &control)); |
| ASSERT_OK(hms_catalog.CreateTable( |
| control->id(), control->name(), kUsername, |
| client::SchemaFromKuduSchema(control->schema()))); |
| |
| // Test case: Upper-case names are handled specially in a few places. |
| shared_ptr<KuduTable> test_uppercase; |
| ASSERT_OK(CreateKuduTable(kudu_client, "default.UPPERCASE")); |
| ASSERT_OK(kudu_client->OpenTable("default.UPPERCASE", &test_uppercase)); |
| ASSERT_OK(hms_catalog.CreateTable( |
| test_uppercase->id(), test_uppercase->name(), kUsername, |
| client::SchemaFromKuduSchema(test_uppercase->schema()))); |
| |
| // Test case: inconsistent schema. |
| shared_ptr<KuduTable> inconsistent_schema; |
| ASSERT_OK(CreateKuduTable(kudu_client, "default.inconsistent_schema")); |
| ASSERT_OK(kudu_client->OpenTable("default.inconsistent_schema", &inconsistent_schema)); |
| ASSERT_OK(hms_catalog.CreateTable( |
| inconsistent_schema->id(), inconsistent_schema->name(), kUsername, |
| SchemaBuilder().Build())); |
| |
| // Test case: inconsistent name. |
| shared_ptr<KuduTable> inconsistent_name; |
| ASSERT_OK(CreateKuduTable(kudu_client, "default.inconsistent_name")); |
| ASSERT_OK(kudu_client->OpenTable("default.inconsistent_name", &inconsistent_name)); |
| ASSERT_OK(hms_catalog.CreateTable( |
| inconsistent_name->id(), "default.inconsistent_name_hms", kUsername, |
| client::SchemaFromKuduSchema(inconsistent_name->schema()))); |
| |
| // Test case: inconsistent master addresses. |
| shared_ptr<KuduTable> inconsistent_master_addrs; |
| ASSERT_OK(CreateKuduTable(kudu_client, "default.inconsistent_master_addrs")); |
| ASSERT_OK(kudu_client->OpenTable("default.inconsistent_master_addrs", |
| &inconsistent_master_addrs)); |
| HmsCatalog invalid_hms_catalog("invalid-master-addrs"); |
| ASSERT_OK(invalid_hms_catalog.Start()); |
| ASSERT_OK(invalid_hms_catalog.CreateTable( |
| inconsistent_master_addrs->id(), inconsistent_master_addrs->name(), kUsername, |
| client::SchemaFromKuduSchema(inconsistent_master_addrs->schema()))); |
| |
| // Test cases: orphan tables in the HMS. |
| ASSERT_OK(hms_catalog.CreateTable( |
| "orphan-hms-table-id", "default.orphan_hms_table", kUsername, |
| SchemaBuilder().Build())); |
| ASSERT_OK(CreateLegacyHmsTable(&hms_client, "default", "orphan_hms_table_legacy_external", |
| "default.orphan_hms_table_legacy_external", |
| master_addr, HmsClient::kExternalTable, kUsername)); |
| ASSERT_OK(CreateLegacyHmsTable(&hms_client, "default", "orphan_hms_table_legacy_managed", |
| "impala::default.orphan_hms_table_legacy_managed", |
| master_addr, HmsClient::kManagedTable, kUsername)); |
| |
| // Test case: orphan table in Kudu. |
| ASSERT_OK(CreateKuduTable(kudu_client, "default.kudu_orphan")); |
| |
| // Test case: legacy external table. |
| shared_ptr<KuduTable> legacy_external; |
| ASSERT_OK(CreateKuduTable(kudu_client, "default.legacy_external")); |
| ASSERT_OK(kudu_client->OpenTable("default.legacy_external", &legacy_external)); |
| ASSERT_OK(CreateLegacyHmsTable(&hms_client, "default", "legacy_external", |
| "default.legacy_external", master_addr, HmsClient::kExternalTable, kUsername)); |
| |
| // Test case: legacy managed table. |
| shared_ptr<KuduTable> legacy_managed; |
| ASSERT_OK(CreateKuduTable(kudu_client, "impala::default.legacy_managed")); |
| ASSERT_OK(kudu_client->OpenTable("impala::default.legacy_managed", &legacy_managed)); |
| ASSERT_OK(CreateLegacyHmsTable(&hms_client, "default", "legacy_managed", |
| "impala::default.legacy_managed", master_addr, HmsClient::kManagedTable, kUsername)); |
| |
| // Test case: legacy managed table with no owner. |
| shared_ptr<KuduTable> legacy_no_owner; |
| ASSERT_OK(CreateKuduTable(kudu_client, "impala::default.legacy_no_owner")); |
| ASSERT_OK(kudu_client->OpenTable("impala::default.legacy_no_owner", &legacy_no_owner)); |
| ASSERT_OK(CreateLegacyHmsTable(&hms_client, "default", "legacy_no_owner", |
| "impala::default.legacy_no_owner", master_addr, HmsClient::kManagedTable, boost::none)); |
| |
| // Test case: legacy external table with a Hive-incompatible name (no database). |
| shared_ptr<KuduTable> legacy_external_hive_incompatible_name; |
| ASSERT_OK(CreateKuduTable(kudu_client, "legacy_external_hive_incompatible_name")); |
| ASSERT_OK(kudu_client->OpenTable("legacy_external_hive_incompatible_name", |
| &legacy_external_hive_incompatible_name)); |
| ASSERT_OK(CreateLegacyHmsTable(&hms_client, "default", "legacy_external_hive_incompatible_name", |
| "legacy_external_hive_incompatible_name", master_addr, |
| HmsClient::kExternalTable, kUsername)); |
| |
| // Test case: Kudu table in non-default database. |
| hive::Database db; |
| db.name = "my_db"; |
| ASSERT_OK(hms_client.CreateDatabase(db)); |
| ASSERT_OK(CreateKuduTable(kudu_client, "my_db.table")); |
| |
| // Enable the HMS integration. |
| cluster_->ShutdownNodes(cluster::ClusterNodes::MASTERS_ONLY); |
| cluster_->EnableMetastoreIntegration(); |
| ASSERT_OK(cluster_->Restart()); |
| |
| unordered_set<string> consistent_tables = { |
| "default.control", |
| }; |
| |
| unordered_set<string> inconsistent_tables = { |
| "default.UPPERCASE", |
| "default.inconsistent_schema", |
| "default.inconsistent_name", |
| "default.inconsistent_master_addrs", |
| "default.orphan_hms_table", |
| "default.orphan_hms_table_legacy_external", |
| "default.orphan_hms_table_legacy_managed", |
| "default.kudu_orphan", |
| "default.legacy_external", |
| "default.legacy_managed", |
| "default.legacy_no_owner", |
| "legacy_external_hive_incompatible_name", |
| "my_db.table", |
| }; |
| |
| // Move a list of tables from the inconsistent set to the consistent set. |
| auto make_consistent = [&] (const vector<string>& tables) { |
| for (const string& table : tables) { |
| ASSERT_EQ(inconsistent_tables.erase(table), 1); |
| } |
| consistent_tables.insert(tables.begin(), tables.end()); |
| }; |
| |
| // Run the HMS check tool and verify that the consistent tables are not |
| // reported, and the inconsistent tables are reported. |
| auto check = [&] () { |
| string out; |
| string err; |
| Status s = RunActionStdoutStderrString(Substitute("hms check $0", master_addr), &out, &err); |
| SCOPED_TRACE(strings::CUnescapeOrDie(out)); |
| if (inconsistent_tables.empty()) { |
| ASSERT_OK(s); |
| ASSERT_STR_NOT_CONTAINS(err, "found inconsistencies in the Kudu and HMS catalogs"); |
| } else { |
| ASSERT_FALSE(s.ok()); |
| ASSERT_STR_CONTAINS(err, "found inconsistencies in the Kudu and HMS catalogs"); |
| } |
| for (const string& table : consistent_tables) { |
| ASSERT_STR_NOT_CONTAINS(out, table); |
| } |
| for (const string& table : inconsistent_tables) { |
| ASSERT_STR_CONTAINS(out, table); |
| } |
| }; |
| |
| // 'hms check' should point out all of the test-case tables, but not the control tables. |
| NO_FATALS(check()); |
| |
| // 'hms fix --dryrun should not change the output of 'hms check'. |
| NO_FATALS(RunActionStdoutNone( |
| Substitute("hms fix $0 --dryrun --drop_orphan_hms_tables", master_addr))); |
| NO_FATALS(check()); |
| |
| // Drop orphan tables. |
| NO_FATALS(RunActionStdoutNone( |
| Substitute("hms fix $0 --drop_orphan_hms_tables --nocreate_missing_hms_tables " |
| "--noupgrade_hms_tables --nofix_inconsistent_tables", master_addr))); |
| make_consistent({ |
| "default.orphan_hms_table", |
| "default.orphan_hms_table_legacy_external", |
| "default.orphan_hms_table_legacy_managed", |
| }); |
| NO_FATALS(check()); |
| |
| // Create missing hms tables. |
| NO_FATALS(RunActionStdoutNone( |
| Substitute("hms fix $0 --noupgrade_hms_tables --nofix_inconsistent_tables", master_addr))); |
| make_consistent({ |
| "default.kudu_orphan", |
| "my_db.table", |
| }); |
| NO_FATALS(check()); |
| |
| // Upgrade legacy HMS tables. |
| NO_FATALS(RunActionStdoutNone( |
| Substitute("hms fix $0 --nofix_inconsistent_tables", master_addr))); |
| make_consistent({ |
| "default.legacy_external", |
| "default.legacy_managed", |
| "default.legacy_no_owner", |
| "legacy_external_hive_incompatible_name", |
| }); |
| NO_FATALS(check()); |
| |
| // Refresh stale HMS tables. |
| NO_FATALS(RunActionStdoutNone(Substitute("hms fix $0", master_addr))); |
| make_consistent({ |
| "default.UPPERCASE", |
| "default.inconsistent_schema", |
| "default.inconsistent_name", |
| "default.inconsistent_master_addrs", |
| }); |
| NO_FATALS(check()); |
| |
| ASSERT_TRUE(inconsistent_tables.empty()); |
| |
| for (const string& table : { |
| "control", |
| "uppercase", |
| "inconsistent_schema", |
| "inconsistent_name_hms", |
| "inconsistent_master_addrs", |
| "kudu_orphan", |
| "legacy_external", |
| "legacy_managed", |
| "legacy_external_hive_incompatible_name", |
| }) { |
| NO_FATALS(ValidateHmsEntries(&hms_client, kudu_client, "default", table, master_addr)); |
| } |
| |
| // Validate the tables in the other databases. |
| NO_FATALS(ValidateHmsEntries(&hms_client, kudu_client, "my_db", "table", master_addr)); |
| |
| vector<string> kudu_tables; |
| kudu_client->ListTables(&kudu_tables); |
| std::sort(kudu_tables.begin(), kudu_tables.end()); |
| ASSERT_EQ(vector<string>({ |
| "default.control", |
| "default.inconsistent_master_addrs", |
| "default.inconsistent_name_hms", |
| "default.inconsistent_schema", |
| "default.kudu_orphan", |
| "default.legacy_external", |
| "default.legacy_external_hive_incompatible_name", |
| "default.legacy_managed", |
| "default.legacy_no_owner", |
| "default.uppercase", |
| "my_db.table", |
| }), kudu_tables); |
| |
| // Check that table ownership is preserved in upgraded legacy tables. |
| for (auto p : vector<pair<string, string>>({ |
| make_pair("legacy_external", kUsername), |
| make_pair("legacy_managed", kUsername), |
| make_pair("legacy_no_owner", ""), |
| })) { |
| hive::Table table; |
| ASSERT_OK(hms_client.GetTable("default", p.first, &table)); |
| ASSERT_EQ(p.second, table.owner); |
| } |
| } |
| |
| // Test HMS inconsistencies that must be manually fixed. |
| TEST_P(ToolTestKerberosParameterized, TestCheckAndManualFixHmsMetadata) { |
| string kUsername = "alice"; |
| ExternalMiniClusterOptions opts; |
| opts.hms_mode = HmsMode::ENABLE_HIVE_METASTORE; |
| opts.enable_kerberos = EnableKerberos(); |
| NO_FATALS(StartExternalMiniCluster(std::move(opts))); |
| |
| string master_addr = cluster_->master()->bound_rpc_addr().ToString(); |
| thrift::ClientOptions hms_opts; |
| hms_opts.enable_kerberos = EnableKerberos(); |
| hms_opts.service_principal = "hive"; |
| HmsClient hms_client(cluster_->hms()->address(), hms_opts); |
| ASSERT_OK(hms_client.Start()); |
| ASSERT_TRUE(hms_client.IsConnected()); |
| |
| FLAGS_hive_metastore_uris = cluster_->hms()->uris(); |
| FLAGS_hive_metastore_sasl_enabled = EnableKerberos(); |
| HmsCatalog hms_catalog(master_addr); |
| ASSERT_OK(hms_catalog.Start()); |
| |
| shared_ptr<KuduClient> kudu_client; |
| ASSERT_OK(cluster_->CreateClient(nullptr, &kudu_client)); |
| |
| // While the metastore integration is disabled create tables in Kudu and the |
| // HMS with inconsistent metadata. |
| |
| // Test case: Multiple HMS tables pointing to a single Kudu table. |
| shared_ptr<KuduTable> duplicate_hms_tables; |
| ASSERT_OK(CreateKuduTable(kudu_client, "default.duplicate_hms_tables")); |
| ASSERT_OK(kudu_client->OpenTable("default.duplicate_hms_tables", &duplicate_hms_tables)); |
| ASSERT_OK(hms_catalog.CreateTable( |
| duplicate_hms_tables->id(), "default.duplicate_hms_tables", kUsername, |
| client::SchemaFromKuduSchema(duplicate_hms_tables->schema()))); |
| ASSERT_OK(hms_catalog.CreateTable( |
| duplicate_hms_tables->id(), "default.duplicate_hms_tables_2", kUsername, |
| client::SchemaFromKuduSchema(duplicate_hms_tables->schema()))); |
| ASSERT_OK(CreateLegacyHmsTable(&hms_client, "default", "duplicate_hms_tables_3", |
| "default.duplicate_hms_tables", |
| master_addr, HmsClient::kExternalTable, kUsername)); |
| |
| // Test case: Kudu tables Hive-incompatible names. |
| ASSERT_OK(CreateKuduTable(kudu_client, "default.hive-incompatible-name")); |
| ASSERT_OK(CreateKuduTable(kudu_client, "no_database")); |
| |
| // Test case: Kudu table in non-existent database. |
| ASSERT_OK(CreateKuduTable(kudu_client, "non_existent_database.table")); |
| |
| // Test case: a legacy table with a Hive name which conflicts with another table in Kudu. |
| ASSERT_OK(CreateLegacyHmsTable(&hms_client, "default", "conflicting_legacy_table", |
| "impala::default.conflicting_legacy_table", |
| master_addr, HmsClient::kManagedTable, kUsername)); |
| ASSERT_OK(CreateKuduTable(kudu_client, "impala::default.conflicting_legacy_table")); |
| ASSERT_OK(CreateKuduTable(kudu_client, "default.conflicting_legacy_table")); |
| |
| // Enable the HMS integration. |
| cluster_->ShutdownNodes(cluster::ClusterNodes::MASTERS_ONLY); |
| cluster_->EnableMetastoreIntegration(); |
| ASSERT_OK(cluster_->Restart()); |
| |
| // Run the HMS check tool and verify that the inconsistent tables are reported. |
| auto check = [&] () { |
| string out; |
| string err; |
| Status s = RunActionStdoutStderrString(Substitute("hms check $0", master_addr), &out, &err); |
| SCOPED_TRACE(strings::CUnescapeOrDie(out)); |
| for (const string& table : vector<string>({ |
| "duplicate_hms_tables", |
| "duplicate_hms_tables_2", |
| "duplicate_hms_tables_3", |
| "default.hive-incompatible-name", |
| "no_database", |
| "non_existent_database.table", |
| "default.conflicting_legacy_table", |
| })) { |
| ASSERT_STR_CONTAINS(out, table); |
| } |
| }; |
| |
| // Check should recognize this inconsistent tables. |
| NO_FATALS(check()); |
| |
| // Fix should fail, since these are not automatically repairable issues. |
| { |
| string out; |
| string err; |
| Status s = RunActionStdoutStderrString(Substitute("hms fix $0", master_addr), &out, &err); |
| SCOPED_TRACE(strings::CUnescapeOrDie(out)); |
| ASSERT_FALSE(s.ok()); |
| } |
| |
| // Check should still fail. |
| NO_FATALS(check()); |
| |
| // Manually drop the duplicate HMS entries. |
| ASSERT_OK(hms_catalog.DropTable(duplicate_hms_tables->id(), "default.duplicate_hms_tables_2")); |
| ASSERT_OK(hms_catalog.DropLegacyTable("default.duplicate_hms_tables_3")); |
| |
| // Rename the incompatible names. |
| NO_FATALS(RunActionStdoutNone(Substitute( |
| "table rename-table --nomodify-external-catalogs $0 " |
| "default.hive-incompatible-name default.hive_compatible_name", master_addr))); |
| NO_FATALS(RunActionStdoutNone(Substitute( |
| "table rename-table --nomodify-external-catalogs $0 " |
| "no_database default.with_database", master_addr))); |
| |
| // Create the missing database. |
| hive::Database db; |
| db.name = "non_existent_database"; |
| ASSERT_OK(hms_client.CreateDatabase(db)); |
| |
| // Rename the conflicting table. |
| NO_FATALS(RunActionStdoutNone(Substitute( |
| "table rename-table --nomodify-external-catalogs $0 " |
| "default.conflicting_legacy_table default.non_conflicting_legacy_table", master_addr))); |
| |
| // Run the automatic fixer to create missing HMS table entries. |
| NO_FATALS(RunActionStdoutNone(Substitute("hms fix $0", master_addr))); |
| |
| // Check should now be clean. |
| NO_FATALS(RunActionStdoutNone(Substitute("hms check $0", master_addr))); |
| |
| // Ensure the tables are available. |
| vector<string> kudu_tables; |
| kudu_client->ListTables(&kudu_tables); |
| std::sort(kudu_tables.begin(), kudu_tables.end()); |
| ASSERT_EQ(vector<string>({ |
| "default.conflicting_legacy_table", |
| "default.duplicate_hms_tables", |
| "default.hive_compatible_name", |
| "default.non_conflicting_legacy_table", |
| "default.with_database", |
| "non_existent_database.table", |
| }), kudu_tables); |
| } |
| |
| TEST_F(ToolTest, TestHmsPrecheck) { |
| ExternalMiniClusterOptions opts; |
| opts.hms_mode = HmsMode::ENABLE_HIVE_METASTORE; |
| NO_FATALS(StartExternalMiniCluster(std::move(opts))); |
| |
| string master_addr = cluster_->master()->bound_rpc_addr().ToString(); |
| |
| shared_ptr<KuduClient> client; |
| ASSERT_OK(cluster_->CreateClient(nullptr, &client)); |
| |
| // Create test tables. |
| for (const string& table_name : { |
| "a.b", |
| "foo.bar", |
| "FOO.bar", |
| "foo.BAR", |
| "fuzz", |
| "FUZZ", |
| "a.b!", |
| "A.B!", |
| }) { |
| ASSERT_OK(CreateKuduTable(client, table_name)); |
| } |
| |
| // Run the precheck tool. It should complain about the conflicting tables. |
| string out; |
| string err; |
| Status s = RunActionStdoutStderrString(Substitute("hms precheck $0", master_addr), &out, &err); |
| ASSERT_FALSE(s.ok()); |
| ASSERT_STR_CONTAINS(err, "found tables in Kudu with case-conflicting names"); |
| ASSERT_STR_CONTAINS(out, "foo.bar"); |
| ASSERT_STR_CONTAINS(out, "FOO.bar"); |
| ASSERT_STR_CONTAINS(out, "foo.BAR"); |
| |
| // It should not complain about tables which don't have conflicting names. |
| ASSERT_STR_NOT_CONTAINS(out, "a.b"); |
| |
| // It should not complain about tables which have Hive-incompatible names. |
| ASSERT_STR_NOT_CONTAINS(out, "fuzz"); |
| ASSERT_STR_NOT_CONTAINS(out, "FUZZ"); |
| ASSERT_STR_NOT_CONTAINS(out, "a.b!"); |
| ASSERT_STR_NOT_CONTAINS(out, "A.B!"); |
| |
| // Rename the conflicting tables. Use the rename table tool to match the actual workflow. |
| NO_FATALS(RunActionStdoutNone(Substitute("table rename_table $0 FOO.bar foo.bar2", master_addr))); |
| NO_FATALS(RunActionStdoutNone(Substitute("table rename_table $0 foo.BAR foo.bar3", master_addr))); |
| |
| // Precheck should now pass, and the cluster should upgrade succesfully. |
| NO_FATALS(RunActionStdoutNone(Substitute("hms precheck $0", master_addr))); |
| |
| // Enable the HMS integration. |
| cluster_->ShutdownNodes(cluster::ClusterNodes::MASTERS_ONLY); |
| cluster_->EnableMetastoreIntegration(); |
| ASSERT_OK(cluster_->Restart()); |
| |
| // Sanity-check the tables. |
| vector<string> tables; |
| ASSERT_OK(client->ListTables(&tables)); |
| std::sort(tables.begin(), tables.end()); |
| ASSERT_EQ(vector<string>({ |
| "A.B!", |
| "FUZZ", |
| "a.b", |
| "a.b!", |
| "foo.bar", |
| "foo.bar2", |
| "foo.bar3", |
| "fuzz", |
| }), tables); |
| } |
| |
| // This test is parameterized on the serialization mode and Kerberos. |
| class ControlShellToolTest : |
| public ToolTest, |
| public ::testing::WithParamInterface<std::tuple<ControlShellProtocol::SerializationMode, |
| bool>> { |
| public: |
| virtual void SetUp() override { |
| ToolTest::SetUp(); |
| |
| // Start the control shell. |
| string mode; |
| switch (serde_mode()) { |
| case ControlShellProtocol::SerializationMode::JSON: mode = "json"; break; |
| case ControlShellProtocol::SerializationMode::PB: mode = "pb"; break; |
| default: LOG(FATAL) << "Unknown serialization mode"; |
| } |
| shell_.reset(new Subprocess({ |
| GetKuduToolAbsolutePath(), |
| "test", |
| "mini_cluster", |
| Substitute("--serialization=$0", mode) |
| })); |
| shell_->ShareParentStdin(false); |
| shell_->ShareParentStdout(false); |
| ASSERT_OK(shell_->Start()); |
| |
| // Start the protocol interface. |
| proto_.reset(new ControlShellProtocol(serde_mode(), |
| ControlShellProtocol::CloseMode::CLOSE_ON_DESTROY, |
| shell_->ReleaseChildStdoutFd(), |
| shell_->ReleaseChildStdinFd())); |
| } |
| |
| virtual void TearDown() override { |
| if (proto_) { |
| // Stopping the protocol interface will close the fds, causing the shell |
| // to exit on its own. |
| proto_.reset(); |
| ASSERT_OK(shell_->Wait()); |
| int exit_status; |
| ASSERT_OK(shell_->GetExitStatus(&exit_status)); |
| ASSERT_EQ(0, exit_status); |
| } |
| ToolTest::TearDown(); |
| } |
| |
| protected: |
| // Send a control message to the shell and receive its response. |
| Status SendReceive(const ControlShellRequestPB& req, ControlShellResponsePB* resp) { |
| RETURN_NOT_OK(proto_->SendMessage(req)); |
| RETURN_NOT_OK(proto_->ReceiveMessage(resp)); |
| if (resp->has_error()) { |
| return StatusFromPB(resp->error()); |
| } |
| return Status::OK(); |
| } |
| |
| ControlShellProtocol::SerializationMode serde_mode() const { |
| return ::testing::get<0>(GetParam()); |
| } |
| |
| bool enable_kerberos() const { |
| return ::testing::get<1>(GetParam()); |
| } |
| |
| unique_ptr<Subprocess> shell_; |
| unique_ptr<ControlShellProtocol> proto_; |
| }; |
| |
| INSTANTIATE_TEST_CASE_P(SerializationModes, ControlShellToolTest, |
| ::testing::Combine(::testing::Values( |
| ControlShellProtocol::SerializationMode::PB, |
| ControlShellProtocol::SerializationMode::JSON), |
| ::testing::Bool())); |
| |
| TEST_P(ControlShellToolTest, TestControlShell) { |
| const int kNumMasters = 1; |
| const int kNumTservers = 3; |
| |
| // Create an illegal cluster first, to make sure that the shell continues to |
| // work in the event of an error. |
| { |
| ControlShellRequestPB req; |
| ControlShellResponsePB resp; |
| req.mutable_create_cluster()->set_num_masters(4); |
| ASSERT_OK(proto_->SendMessage(req)); |
| ASSERT_OK(proto_->ReceiveMessage(&resp)); |
| ASSERT_TRUE(resp.has_error()); |
| Status s = StatusFromPB(resp.error()); |
| ASSERT_TRUE(s.IsInvalidArgument()); |
| ASSERT_STR_CONTAINS(s.ToString(), "only one or three masters are supported"); |
| } |
| |
| // Create a cluster. |
| { |
| ControlShellRequestPB req; |
| ControlShellResponsePB resp; |
| req.mutable_create_cluster()->set_cluster_root(JoinPathSegments( |
| test_dir_, "minicluster-data")); |
| req.mutable_create_cluster()->set_num_masters(kNumMasters); |
| req.mutable_create_cluster()->set_num_tservers(kNumTservers); |
| req.mutable_create_cluster()->set_enable_kerberos(enable_kerberos()); |
| ASSERT_OK(SendReceive(req, &resp)); |
| } |
| |
| // Try creating a second cluster. It should fail. |
| { |
| ControlShellRequestPB req; |
| ControlShellResponsePB resp; |
| req.mutable_create_cluster()->set_cluster_root(JoinPathSegments( |
| test_dir_, "minicluster-data")); |
| ASSERT_OK(proto_->SendMessage(req)); |
| ASSERT_OK(proto_->ReceiveMessage(&resp)); |
| ASSERT_TRUE(resp.has_error()); |
| Status s = StatusFromPB(resp.error()); |
| ASSERT_TRUE(s.IsInvalidArgument()); |
| ASSERT_STR_CONTAINS(s.ToString(), "cluster already created"); |
| } |
| |
| // Start it. |
| { |
| ControlShellRequestPB req; |
| ControlShellResponsePB resp; |
| req.mutable_start_cluster(); |
| ASSERT_OK(SendReceive(req, &resp)); |
| } |
| |
| // Get the masters. |
| vector<DaemonInfoPB> masters; |
| { |
| ControlShellRequestPB req; |
| ControlShellResponsePB resp; |
| req.mutable_get_masters(); |
| ASSERT_OK(SendReceive(req, &resp)); |
| ASSERT_TRUE(resp.has_get_masters()); |
| ASSERT_EQ(kNumMasters, resp.get_masters().masters_size()); |
| masters.assign(resp.get_masters().masters().begin(), |
| resp.get_masters().masters().end()); |
| } |
| |
| if (enable_kerberos()) { |
| // Set up the KDC environment variables so that a client can authenticate |
| // to this cluster. |
| // |
| // Normally this is handled automatically by the cluster's MiniKdc, but |
| // since the cluster is running in a subprocess, we have to do it manually. |
| unordered_map<string, string> kdc_env_vars; |
| ControlShellRequestPB req; |
| ControlShellResponsePB resp; |
| req.mutable_get_kdc_env_vars(); |
| ASSERT_OK(SendReceive(req, &resp)); |
| ASSERT_TRUE(resp.has_get_kdc_env_vars()); |
| for (const auto& e : resp.get_kdc_env_vars().env_vars()) { |
| PCHECK(setenv(e.first.c_str(), e.second.c_str(), 1) == 0); |
| } |
| } else { |
| // get_kdc_env_vars should fail on a non-Kerberized cluster. |
| ControlShellRequestPB req; |
| ControlShellResponsePB resp; |
| req.mutable_get_kdc_env_vars(); |
| ASSERT_OK(proto_->SendMessage(req)); |
| ASSERT_OK(proto_->ReceiveMessage(&resp)); |
| ASSERT_TRUE(resp.has_error()); |
| Status s = StatusFromPB(resp.error()); |
| ASSERT_TRUE(s.IsNotFound()); |
| ASSERT_STR_CONTAINS(s.ToString(), "kdc not found"); |
| } |
| |
| // Create a table. |
| { |
| KuduClientBuilder client_builder; |
| for (const auto& e : masters) { |
| HostPort hp; |
| ASSERT_OK(HostPortFromPB(e.bound_rpc_address(), &hp)); |
| client_builder.add_master_server_addr(hp.ToString()); |
| } |
| shared_ptr<KuduClient> client; |
| ASSERT_OK(client_builder.Build(&client)); |
| KuduSchemaBuilder schema_builder; |
| schema_builder.AddColumn("foo") |
| ->Type(client::KuduColumnSchema::INT32) |
| ->NotNull() |
| ->PrimaryKey(); |
| KuduSchema schema; |
| ASSERT_OK(schema_builder.Build(&schema)); |
| unique_ptr<client::KuduTableCreator> table_creator( |
| client->NewTableCreator()); |
| ASSERT_OK(table_creator->table_name("test") |
| .schema(&schema) |
| .set_range_partition_columns({ "foo" }) |
| .Create()); |
| } |
| |
| // Get the tservers. |
| vector<DaemonInfoPB> tservers; |
| { |
| ControlShellRequestPB req; |
| ControlShellResponsePB resp; |
| req.mutable_get_tservers(); |
| ASSERT_OK(SendReceive(req, &resp)); |
| ASSERT_TRUE(resp.has_get_tservers()); |
| ASSERT_EQ(kNumTservers, resp.get_tservers().tservers_size()); |
| tservers.assign(resp.get_tservers().tservers().begin(), |
| resp.get_tservers().tservers().end()); |
| } |
| |
| // Stop a tserver. |
| { |
| ControlShellRequestPB req; |
| ControlShellResponsePB resp; |
| *req.mutable_stop_daemon()->mutable_id() = tservers[0].id(); |
| ASSERT_OK(SendReceive(req, &resp)); |
| } |
| |
| // Restart it. |
| { |
| ControlShellRequestPB req; |
| ControlShellResponsePB resp; |
| *req.mutable_start_daemon()->mutable_id() = tservers[0].id(); |
| ASSERT_OK(SendReceive(req, &resp)); |
| } |
| |
| // Stop a master. |
| { |
| ControlShellRequestPB req; |
| ControlShellResponsePB resp; |
| *req.mutable_stop_daemon()->mutable_id() = masters[0].id(); |
| ASSERT_OK(SendReceive(req, &resp)); |
| } |
| |
| // Restart it. |
| { |
| ControlShellRequestPB req; |
| ControlShellResponsePB resp; |
| *req.mutable_start_daemon()->mutable_id() = masters[0].id(); |
| ASSERT_OK(SendReceive(req, &resp)); |
| } |
| |
| // Restart some non-existent daemons. |
| vector<DaemonIdentifierPB> daemons_to_restart; |
| { |
| // Unknown daemon type. |
| DaemonIdentifierPB id; |
| id.set_type(UNKNOWN_DAEMON); |
| daemons_to_restart.emplace_back(std::move(id)); |
| } |
| { |
| // Tablet server #5. |
| DaemonIdentifierPB id; |
| id.set_type(TSERVER); |
| id.set_index(5); |
| daemons_to_restart.emplace_back(std::move(id)); |
| } |
| { |
| // Master without an index. |
| DaemonIdentifierPB id; |
| id.set_type(MASTER); |
| daemons_to_restart.emplace_back(std::move(id)); |
| } |
| if (!enable_kerberos()) { |
| // KDC for a non-Kerberized cluster. |
| DaemonIdentifierPB id; |
| id.set_type(KDC); |
| daemons_to_restart.emplace_back(std::move(id)); |
| } |
| for (const auto& daemon : daemons_to_restart) { |
| ControlShellRequestPB req; |
| ControlShellResponsePB resp; |
| *req.mutable_start_daemon()->mutable_id() = daemon; |
| ASSERT_OK(proto_->SendMessage(req)); |
| ASSERT_OK(proto_->ReceiveMessage(&resp)); |
| ASSERT_TRUE(resp.has_error()); |
| } |
| |
| // Stop the cluster. |
| { |
| ControlShellRequestPB req; |
| ControlShellResponsePB resp; |
| req.mutable_stop_cluster(); |
| ASSERT_OK(SendReceive(req, &resp)); |
| } |
| |
| // Restart it. |
| { |
| ControlShellRequestPB req; |
| ControlShellResponsePB resp; |
| req.mutable_start_cluster(); |
| ASSERT_OK(SendReceive(req, &resp)); |
| } |
| |
| if (enable_kerberos()) { |
| // Restart the KDC. |
| { |
| ControlShellRequestPB req; |
| ControlShellResponsePB resp; |
| req.mutable_stop_daemon()->mutable_id()->set_type(KDC); |
| ASSERT_OK(SendReceive(req, &resp)); |
| } |
| { |
| ControlShellRequestPB req; |
| ControlShellResponsePB resp; |
| req.mutable_start_daemon()->mutable_id()->set_type(KDC); |
| ASSERT_OK(SendReceive(req, &resp)); |
| } |
| // Test kinit by deleting the ticket cache, kinitting, and |
| // ensuring it is recreated. |
| { |
| char* ccache_path = getenv("KRB5CCNAME"); |
| ASSERT_TRUE(ccache_path); |
| ASSERT_OK(env_->DeleteFile(ccache_path)); |
| ControlShellRequestPB req; |
| ControlShellResponsePB resp; |
| req.mutable_kinit()->set_username("test-user"); |
| ASSERT_OK(SendReceive(req, &resp)); |
| ASSERT_TRUE(env_->FileExists(ccache_path)); |
| } |
| } |
| |
| // Destroy the cluster. |
| { |
| ControlShellRequestPB req; |
| ControlShellResponsePB resp; |
| req.mutable_destroy_cluster(); |
| ASSERT_OK(SendReceive(req, &resp)); |
| } |
| } |
| |
| static void CreateTableWithFlushedData(const string& table_name, |
| InternalMiniCluster* cluster) { |
| // Use a schema with a high number of columns to encourage the creation of |
| // many data blocks. |
| KuduSchemaBuilder schema_builder; |
| schema_builder.AddColumn("key") |
| ->Type(client::KuduColumnSchema::INT32) |
| ->NotNull() |
| ->PrimaryKey(); |
| for (int i = 0; i < 50; i++) { |
| schema_builder.AddColumn(Substitute("col$0", i)) |
| ->Type(client::KuduColumnSchema::INT32) |
| ->NotNull(); |
| } |
| KuduSchema schema; |
| ASSERT_OK(schema_builder.Build(&schema)); |
| |
| // Create a table and write some data to it. |
| TestWorkload workload(cluster); |
| workload.set_schema(schema); |
| workload.set_table_name(table_name); |
| workload.set_num_replicas(1); |
| workload.Setup(); |
| workload.Start(); |
| ASSERT_EVENTUALLY([&](){ |
| ASSERT_GE(workload.rows_inserted(), 10000); |
| }); |
| workload.StopAndJoin(); |
| |
| // Flush all tablets belonging to this table. |
| for (int i = 0; i < cluster->num_tablet_servers(); i++) { |
| vector<scoped_refptr<TabletReplica>> replicas; |
| cluster->mini_tablet_server(i)->server()->tablet_manager()->GetTabletReplicas(&replicas); |
| for (const auto& r : replicas) { |
| if (r->tablet_metadata()->table_name() == table_name) { |
| ASSERT_OK(r->tablet()->Flush()); |
| } |
| } |
| } |
| } |
| |
| // This test simulates a user trying to update from not having data directories |
| // (i.e. just the wal dir). Any supplied `fs_data_dirs` options thereafter that |
| // don't include `fs_wal_dir` would effectively lead to an attempt at swapping |
| // data directories, which is not supported. |
| TEST_F(ToolTest, TestFsSwappingDirectoriesFailsGracefully) { |
| if (FLAGS_block_manager == "file") { |
| LOG(INFO) << "Skipping test, only log block manager is supported"; |
| return; |
| } |
| |
| // Configure the server to share the data root and wal root. |
| NO_FATALS(StartMiniCluster()); |
| MiniTabletServer* mts = mini_cluster_->mini_tablet_server(0); |
| mts->Shutdown(); |
| |
| // Now try to update to put data exclusively in a different directory. |
| const string& wal_root = mts->options()->fs_opts.wal_root; |
| const string& new_data_root_no_wal = GetTestPath("foo"); |
| string stderr; |
| Status s = RunTool(Substitute( |
| "fs update_dirs --fs_wal_dir=$0 --fs_data_dirs=$1", |
| wal_root, new_data_root_no_wal), nullptr, &stderr); |
| ASSERT_STR_CONTAINS(stderr, "no healthy data directories found"); |
| |
| // If we instead try to add the directory to the existing list of |
| // directories, Kudu should allow it. |
| vector<string> new_data_roots = { new_data_root_no_wal, wal_root }; |
| NO_FATALS(RunActionStdoutNone(Substitute( |
| "fs update_dirs --fs_wal_dir=$0 --fs_data_dirs=$1", |
| wal_root, JoinStrings(new_data_roots, ",")))); |
| } |
| |
| TEST_F(ToolTest, TestFsAddRemoveDataDirEndToEnd) { |
| const string kTableFoo = "foo"; |
| const string kTableBar = "bar"; |
| |
| if (FLAGS_block_manager == "file") { |
| LOG(INFO) << "Skipping test, only log block manager is supported"; |
| return; |
| } |
| |
| // Start a cluster whose tserver has multiple data directories. |
| InternalMiniClusterOptions opts; |
| opts.num_data_dirs = 2; |
| NO_FATALS(StartMiniCluster(std::move(opts))); |
| |
| // Create a table and flush some test data to it. |
| NO_FATALS(CreateTableWithFlushedData(kTableFoo, mini_cluster_.get())); |
| |
| // Add a new data directory. |
| MiniTabletServer* mts = mini_cluster_->mini_tablet_server(0); |
| const string& wal_root = mts->options()->fs_opts.wal_root; |
| vector<string> data_roots = mts->options()->fs_opts.data_roots; |
| string to_add = JoinPathSegments(DirName(data_roots.back()), "data-new"); |
| data_roots.emplace_back(to_add); |
| mts->Shutdown(); |
| NO_FATALS(RunActionStdoutNone(Substitute( |
| "fs update_dirs --fs_wal_dir=$0 --fs_data_dirs=$1", |
| wal_root, JoinStrings(data_roots, ",")))); |
| |
| // Reconfigure the tserver to use the newly added data directory and restart it. |
| // |
| // Note: WaitStarted() will return a bad status if any tablets fail to bootstrap. |
| mts->options()->fs_opts.data_roots = data_roots; |
| ASSERT_OK(mts->Start()); |
| ASSERT_OK(mts->WaitStarted()); |
| |
| // Create a second table and flush some data. The flush should write some |
| // data to the newly added data directory. |
| uint64_t disk_space_used_before; |
| ASSERT_OK(env_->GetFileSizeOnDiskRecursively(to_add, &disk_space_used_before)); |
| NO_FATALS(CreateTableWithFlushedData(kTableBar, mini_cluster_.get())); |
| uint64_t disk_space_used_after; |
| ASSERT_OK(env_->GetFileSizeOnDiskRecursively(to_add, &disk_space_used_after)); |
| ASSERT_GT(disk_space_used_after, disk_space_used_before); |
| |
| // Try to remove the newly added data directory. This will fail because |
| // tablets from the second table are configured to use it. |
| mts->Shutdown(); |
| data_roots.pop_back(); |
| string stderr; |
| ASSERT_TRUE(RunActionStderrString(Substitute( |
| "fs update_dirs --fs_wal_dir=$0 --fs_data_dirs=$1", |
| wal_root, JoinStrings(data_roots, ",")), &stderr).IsRuntimeError()); |
| ASSERT_STR_CONTAINS( |
| stderr, "Not found: cannot update data directories: at least one " |
| "tablet is configured to use removed data directory. Retry with --force " |
| "to override this"); |
| |
| // Make sure the failure really had no effect. |
| ASSERT_OK(mts->Start()); |
| ASSERT_OK(mts->WaitStarted()); |
| mts->Shutdown(); |
| |
| // If we force the removal it'll succeed, but the tserver will fail to |
| // bootstrap some tablets when restarted. |
| NO_FATALS(RunActionStdoutNone(Substitute( |
| "fs update_dirs --fs_wal_dir=$0 --fs_data_dirs=$1 --force", |
| wal_root, JoinStrings(data_roots, ",")))); |
| mts->options()->fs_opts.data_roots = data_roots; |
| ASSERT_OK(mts->Start()); |
| Status s = mts->WaitStarted(); |
| ASSERT_TRUE(s.IsNotFound()); |
| ASSERT_STR_CONTAINS(s.ToString(), "one or more data dirs may have been removed"); |
| |
| // Tablets belonging to the first table should still be OK, but those in the |
| // second table should have all failed. |
| { |
| vector<scoped_refptr<TabletReplica>> replicas; |
| mts->server()->tablet_manager()->GetTabletReplicas(&replicas); |
| ASSERT_GT(replicas.size(), 0); |
| for (const auto& r : replicas) { |
| const string& table_name = r->tablet_metadata()->table_name(); |
| if (table_name == kTableFoo) { |
| ASSERT_EQ(tablet::RUNNING, r->state()); |
| } else { |
| ASSERT_EQ(kTableBar, table_name); |
| ASSERT_EQ(tablet::FAILED, r->state()); |
| } |
| } |
| } |
| |
| // Add the removed data directory back. All tablets should successfully |
| // bootstrap. |
| mts->Shutdown(); |
| data_roots.emplace_back(to_add); |
| NO_FATALS(RunActionStdoutNone(Substitute( |
| "fs update_dirs --fs_wal_dir=$0 --fs_data_dirs=$1", |
| wal_root, JoinStrings(data_roots, ",")))); |
| mts->options()->fs_opts.data_roots = data_roots; |
| ASSERT_OK(mts->Start()); |
| ASSERT_OK(mts->WaitStarted()); |
| mts->Shutdown(); |
| |
| // Remove it again so that the second table's tablets fail once again. |
| data_roots.pop_back(); |
| NO_FATALS(RunActionStdoutNone(Substitute( |
| "fs update_dirs --fs_wal_dir=$0 --fs_data_dirs=$1 --force", |
| wal_root, JoinStrings(data_roots, ",")))); |
| mts->options()->fs_opts.data_roots = data_roots; |
| ASSERT_OK(mts->Start()); |
| s = mts->WaitStarted(); |
| ASSERT_TRUE(s.IsNotFound()); |
| ASSERT_STR_CONTAINS(s.ToString(), "one or more data dirs may have been removed"); |
| |
| // Delete the second table and wait for all of its tablets to be deleted. |
| shared_ptr<KuduClient> client; |
| ASSERT_OK(mini_cluster_->CreateClient(nullptr, &client)); |
| ASSERT_OK(client->DeleteTable(kTableBar)); |
| ASSERT_EVENTUALLY([&]{ |
| vector<scoped_refptr<TabletReplica>> replicas; |
| mts->server()->tablet_manager()->GetTabletReplicas(&replicas); |
| for (const auto& r : replicas) { |
| ASSERT_NE(kTableBar, r->tablet_metadata()->table_name()); |
| } |
| }); |
| |
| // Shut down the tserver, add a new data directory, and restart it. There |
| // should be no bootstrapping errors because the second table (whose tablets |
| // were configured to use the removed data directory) is gone. |
| mts->Shutdown(); |
| data_roots.emplace_back(JoinPathSegments(DirName(data_roots.back()), "data-new2")); |
| NO_FATALS(RunActionStdoutNone(Substitute( |
| "fs update_dirs --fs_wal_dir=$0 --fs_data_dirs=$1", |
| wal_root, JoinStrings(data_roots, ",")))); |
| mts->options()->fs_opts.data_roots = data_roots; |
| ASSERT_OK(mts->Start()); |
| ASSERT_OK(mts->WaitStarted()); |
| |
| // Shut down again, delete the newly added data directory, and restart. No |
| // errors, because the first table's tablets were never configured to use the |
| // new data directory. |
| mts->Shutdown(); |
| data_roots.pop_back(); |
| NO_FATALS(RunActionStdoutNone(Substitute( |
| "fs update_dirs --fs_wal_dir=$0 --fs_data_dirs=$1", |
| wal_root, JoinStrings(data_roots, ",")))); |
| mts->options()->fs_opts.data_roots = data_roots; |
| ASSERT_OK(mts->Start()); |
| ASSERT_OK(mts->WaitStarted()); |
| } |
| |
| TEST_F(ToolTest, TestDumpFSWithNonDefaultMetadataDir) { |
| const string kTestDir = GetTestPath("test"); |
| ASSERT_OK(env_->CreateDir(kTestDir)); |
| string uuid; |
| FsManagerOpts opts; |
| { |
| opts.wal_root = JoinPathSegments(kTestDir, "wal"); |
| opts.metadata_root = JoinPathSegments(kTestDir, "meta"); |
| FsManager fs(env_, opts); |
| ASSERT_OK(fs.CreateInitialFileSystemLayout()); |
| ASSERT_OK(fs.Open()); |
| uuid = fs.uuid(); |
| } |
| // Because a non-default metadata directory was specified, users must provide |
| // enough arguments to open the FsManager, or else FS tools will not work. |
| // The tool will fail in its own process. Catch its output. |
| string stderr; |
| Status s = RunTool(Substitute("fs dump uuid --fs_wal_dir=$0", opts.wal_root), |
| nullptr, &stderr, {}, {}); |
| ASSERT_TRUE(s.IsRuntimeError()); |
| ASSERT_STR_CONTAINS(s.ToString(), "process exited with non-zero status"); |
| SCOPED_TRACE(stderr); |
| ASSERT_STR_CONTAINS(stderr, "could not verify required directory"); |
| |
| // Providing the necessary arguments, the tool should work. |
| string stdout; |
| NO_FATALS(RunActionStdoutString(Substitute( |
| "fs dump uuid --fs_wal_dir=$0 --fs_metadata_dir=$1", |
| opts.wal_root, opts.metadata_root), &stdout)); |
| SCOPED_TRACE(stdout); |
| ASSERT_EQ(uuid, stdout); |
| } |
| |
| TEST_F(ToolTest, TestReplaceTablet) { |
| constexpr int kNumTservers = 3; |
| constexpr int kNumTablets = 3; |
| constexpr int kNumRows = 1000; |
| const MonoDelta kTimeout = MonoDelta::FromSeconds(30); |
| ExternalMiniClusterOptions opts; |
| opts.num_tablet_servers = kNumTservers; |
| NO_FATALS(StartExternalMiniCluster(std::move(opts))); |
| |
| // Setup a table using TestWorkload. |
| TestWorkload workload(cluster_.get()); |
| workload.set_num_tablets(kNumTablets); |
| workload.set_num_replicas(kNumTservers); |
| workload.Setup(); |
| |
| // Insert some rows. |
| workload.Start(); |
| while (workload.rows_inserted() < kNumRows) { |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| } |
| workload.StopAndJoin(); |
| |
| // Pick a tablet to break. |
| vector<string> tablet_ids; |
| TServerDetails* ts = ts_map_[cluster_->tablet_server(0)->uuid()]; |
| ASSERT_OK(ListRunningTabletIds(ts, kTimeout, &tablet_ids)); |
| const string old_tablet_id = tablet_ids[Random(SeedRandom()).Uniform(tablet_ids.size())]; |
| |
| // Break the tablet by doing the following: |
| // 1. Tombstone two replicas. |
| // 2. Stop the tablet server hosting the third. |
| for (int i = 0; i < 2; i ++) { |
| ASSERT_OK(DeleteTablet(ts_map_[cluster_->tablet_server(i)->uuid()], old_tablet_id, |
| TabletDataState::TABLET_DATA_TOMBSTONED, kTimeout)); |
| } |
| cluster_->tablet_server(2)->Shutdown(); |
| |
| // Replace the tablet. |
| const string cmd = Substitute("tablet unsafe_replace_tablet $0 $1", |
| cluster_->master()->bound_rpc_addr().ToString(), |
| old_tablet_id); |
| string stderr; |
| NO_FATALS(RunActionStderrString(cmd, &stderr)); |
| ASSERT_STR_CONTAINS(stderr, old_tablet_id); |
| |
| // Restart the down tablet server. |
| ASSERT_OK(cluster_->tablet_server(2)->Restart()); |
| |
| // After replacement and a short delay, the new tablet should be present |
| // and the old tablet should be gone, leaving overall the same number of tablets. |
| ASSERT_EVENTUALLY([&]() { |
| for (int i = 0; i < kNumTservers; i++) { |
| ts = ts_map_[cluster_->tablet_server(i)->uuid()]; |
| ASSERT_OK(ListRunningTabletIds(ts, kTimeout, &tablet_ids)); |
| ASSERT_TRUE(std::none_of(tablet_ids.begin(), tablet_ids.end(), |
| [&](const string& tablet_id) -> bool { return tablet_id == old_tablet_id; })); |
| } |
| }); |
| |
| // The replaced tablet can self-heal because of tombstoned voting. |
| NO_FATALS(ClusterVerifier(cluster_.get()).CheckCluster()); |
| |
| // Sanity check: there should be no more rows than we inserted before the replace. |
| // TODO(wdberkeley): Should also be possible to keep inserting through a replace. |
| client::sp::shared_ptr<KuduTable> workload_table; |
| ASSERT_OK(workload.client()->OpenTable(workload.table_name(), &workload_table)); |
| ASSERT_GE(workload.rows_inserted(), CountTableRows(workload_table.get())); |
| } |
| |
| TEST_F(ToolTest, TestGetFlags) { |
| ExternalMiniClusterOptions opts; |
| opts.num_tablet_servers = 1; |
| NO_FATALS(StartExternalMiniCluster(std::move(opts))); |
| |
| // Check that we get non-default flags. |
| // CSV formatting is easiest to match against. |
| // It seems safe to assume that -help and -logemaillevel will not be |
| // set, and that fs_wal_dir will be set to a non-default value. |
| for (const string& daemon_type : { "master", "tserver" }) { |
| const string& daemon_addr = daemon_type == "master" ? |
| cluster_->master()->bound_rpc_addr().ToString() : |
| cluster_->tablet_server(0)->bound_rpc_addr().ToString(); |
| const string& wal_dir = daemon_type == "master" ? |
| cluster_->master()->wal_dir() : |
| cluster_->tablet_server(0)->wal_dir(); |
| string out; |
| NO_FATALS(RunActionStdoutString( |
| Substitute("$0 get_flags $1 -format=csv", daemon_type, daemon_addr), |
| &out)); |
| ASSERT_STR_NOT_MATCHES(out, "help,*"); |
| ASSERT_STR_NOT_MATCHES(out, "logemaillevel,*"); |
| ASSERT_STR_CONTAINS(out, Substitute("fs_wal_dir,$0,false", wal_dir)); |
| |
| // Check that we get all flags with -all_flags. |
| out.clear(); |
| NO_FATALS(RunActionStdoutString( |
| Substitute("$0 get_flags $1 -format=csv -all_flags", daemon_type, daemon_addr), |
| &out)); |
| ASSERT_STR_CONTAINS(out, "help,false,true"); |
| ASSERT_STR_CONTAINS(out, "logemaillevel,999,true"); |
| ASSERT_STR_CONTAINS(out, Substitute("fs_wal_dir,$0,false", wal_dir)); |
| |
| // Check that -flag_tags filter to matching tags. |
| // -logemaillevel is an unsafe flag. |
| out.clear(); |
| NO_FATALS(RunActionStdoutString( |
| Substitute("$0 get_flags $1 -format=csv -all_flags -flag_tags=stable", |
| daemon_type, daemon_addr), |
| &out)); |
| ASSERT_STR_CONTAINS(out, "help,false,true"); |
| ASSERT_STR_NOT_MATCHES(out, "logemaillevel,*"); |
| ASSERT_STR_CONTAINS(out, Substitute("fs_wal_dir,$0,false", wal_dir)); |
| } |
| } |
| |
| TEST_F(ToolTest, TestParseStacks) { |
| const string kDataPath = JoinPathSegments(GetTestExecutableDirectory(), |
| "testdata/sample-diagnostics-log.txt"); |
| const string kBadDataPath = JoinPathSegments(GetTestExecutableDirectory(), |
| "testdata/bad-diagnostics-log.txt"); |
| string stdout; |
| NO_FATALS(RunActionStdoutString( |
| Substitute("diagnose parse_stacks $0", kDataPath), |
| &stdout)); |
| // Spot check a few of the things that should be in the output. |
| ASSERT_STR_CONTAINS(stdout, "Stacks at 0314 11:54:20.737790 (periodic):"); |
| ASSERT_STR_CONTAINS(stdout, "0x1caef51 kudu::StackTraceSnapshot::SnapshotAllStacks()"); |
| ASSERT_STR_CONTAINS(stdout, "0x3f5ec0f710 <unknown>"); |
| |
| string stderr; |
| Status s = RunActionStderrString( |
| Substitute("diagnose parse_stacks $0", kBadDataPath), |
| &stderr); |
| ASSERT_TRUE(s.IsRuntimeError()); |
| ASSERT_STR_MATCHES(stderr, "failed to parse stacks from .*: at line 1: " |
| "invalid JSON payload.*lacks ending quotation"); |
| } |
| |
| class Is343ReplicaUtilTest : |
| public ToolTest, |
| public ::testing::WithParamInterface<bool> { |
| }; |
| INSTANTIATE_TEST_CASE_P(, Is343ReplicaUtilTest, ::testing::Bool()); |
| TEST_P(Is343ReplicaUtilTest, Is343Cluster) { |
| constexpr auto kReplicationFactor = 3; |
| const auto is_343_scheme = GetParam(); |
| |
| ExternalMiniClusterOptions opts; |
| opts.num_tablet_servers = kReplicationFactor; |
| opts.extra_master_flags = { |
| Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme), |
| }; |
| opts.extra_tserver_flags = { |
| Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme), |
| }; |
| NO_FATALS(StartExternalMiniCluster(opts)); |
| const auto& master_addr = cluster_->master()->bound_rpc_addr().ToString(); |
| |
| { |
| const string empty_name = ""; |
| bool is_343 = false; |
| const auto s = Is343SchemeCluster({ master_addr }, empty_name, &is_343); |
| ASSERT_TRUE(s.IsNotFound()) << s.ToString(); |
| } |
| |
| { |
| bool is_343 = false; |
| const auto s = Is343SchemeCluster({ master_addr }, boost::none, &is_343); |
| ASSERT_TRUE(s.IsIncomplete()) << s.ToString(); |
| ASSERT_STR_CONTAINS(s.ToString(), "not a single table found"); |
| } |
| |
| // Create a table. |
| TestWorkload workload(cluster_.get()); |
| workload.set_num_replicas(kReplicationFactor); |
| workload.set_table_name("is_343_test_table"); |
| workload.Setup(); |
| |
| { |
| bool is_343 = false; |
| const auto s = Is343SchemeCluster({ master_addr }, boost::none, &is_343); |
| ASSERT_TRUE(s.ok()) << s.ToString(); |
| ASSERT_EQ(is_343_scheme, is_343); |
| } |
| } |
| |
| } // namespace tools |
| } // namespace kudu |