| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "kudu/tserver/tablet_server.h" |
| |
| #include <unistd.h> |
| |
| #include <algorithm> |
| #include <atomic> |
| #include <cstdint> |
| #include <cstdlib> |
| #include <functional> |
| #include <initializer_list> |
| #include <map> |
| #include <memory> |
| #include <optional> |
| #include <random> |
| #include <set> |
| #include <sstream> |
| #include <string> |
| #include <thread> |
| #include <unordered_set> |
| #include <utility> |
| #include <vector> |
| |
| #include <gflags/gflags.h> |
| #include <glog/logging.h> |
| #include <google/protobuf/util/message_differencer.h> |
| #include <gtest/gtest.h> |
| |
| #include "kudu/clock/clock.h" |
| #include "kudu/clock/hybrid_clock.h" |
| #include "kudu/common/common.pb.h" |
| #include "kudu/common/encoded_key.h" |
| #include "kudu/common/partial_row.h" |
| #include "kudu/common/partition.h" |
| #include "kudu/common/row_operations.h" |
| #include "kudu/common/row_operations.pb.h" |
| #include "kudu/common/schema.h" |
| #include "kudu/common/timestamp.h" |
| #include "kudu/common/wire_protocol-test-util.h" |
| #include "kudu/common/wire_protocol.h" |
| #include "kudu/common/wire_protocol.pb.h" |
| #include "kudu/consensus/log-test-base.h" |
| #include "kudu/consensus/log.h" |
| #include "kudu/consensus/metadata.pb.h" |
| #include "kudu/consensus/raft_consensus.h" |
| #include "kudu/fs/block_id.h" |
| #include "kudu/fs/block_manager.h" |
| #include "kudu/fs/data_dirs.h" |
| #include "kudu/fs/fs-test-util.h" |
| #include "kudu/fs/fs.pb.h" |
| #include "kudu/fs/fs_manager.h" |
| #include "kudu/gutil/basictypes.h" |
| #include "kudu/gutil/casts.h" |
| #include "kudu/gutil/map-util.h" |
| #include "kudu/gutil/port.h" |
| #include "kudu/gutil/ref_counted.h" |
| #include "kudu/gutil/stringprintf.h" |
| #include "kudu/gutil/strings/escaping.h" |
| #include "kudu/gutil/strings/join.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/gutil/sysinfo.h" |
| #include "kudu/rpc/messenger.h" |
| #include "kudu/rpc/rpc_controller.h" |
| #include "kudu/rpc/rpc_header.pb.h" |
| #include "kudu/rpc/user_credentials.h" |
| #include "kudu/server/rpc_server.h" |
| #include "kudu/server/server_base.pb.h" |
| #include "kudu/server/server_base.proxy.h" |
| #include "kudu/tablet/local_tablet_writer.h" |
| #include "kudu/tablet/metadata.pb.h" |
| #include "kudu/tablet/tablet.h" |
| #include "kudu/tablet/tablet_metadata.h" |
| #include "kudu/tablet/tablet_metrics.h" |
| #include "kudu/tablet/tablet_replica.h" |
| #include "kudu/tserver/heartbeater.h" |
| #include "kudu/tserver/mini_tablet_server.h" |
| #include "kudu/tserver/scanners.h" |
| #include "kudu/tserver/tablet_server-test-base.h" |
| #include "kudu/tserver/tablet_server_options.h" |
| #include "kudu/tserver/tablet_server_test_util.h" |
| #include "kudu/tserver/ts_tablet_manager.h" |
| #include "kudu/tserver/tserver.pb.h" |
| #include "kudu/tserver/tserver_admin.pb.h" |
| #include "kudu/tserver/tserver_admin.proxy.h" |
| #include "kudu/tserver/tserver_service.pb.h" |
| #include "kudu/tserver/tserver_service.proxy.h" |
| #include "kudu/util/array_view.h" |
| #include "kudu/util/countdown_latch.h" |
| #include "kudu/util/crc.h" |
| #include "kudu/util/curl_util.h" |
| #include "kudu/util/env.h" |
| #include "kudu/util/faststring.h" |
| #include "kudu/util/hdr_histogram.h" |
| #include "kudu/util/jsonwriter.h" |
| #include "kudu/util/logging_test_util.h" |
| #include "kudu/util/memory/arena.h" |
| #include "kudu/util/metrics.h" |
| #include "kudu/util/monotime.h" |
| #include "kudu/util/net/net_util.h" |
| #include "kudu/util/net/sockaddr.h" |
| #include "kudu/util/oid_generator.h" |
| #include "kudu/util/path_util.h" |
| #include "kudu/util/pb_util.h" |
| #include "kudu/util/scoped_cleanup.h" |
| #include "kudu/util/slice.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/stopwatch.h" |
| #include "kudu/util/test_macros.h" |
| #include "kudu/util/test_util.h" |
| #include "kudu/util/zlib.h" |
| |
| using google::protobuf::util::MessageDifferencer; |
| using kudu::clock::Clock; |
| using kudu::clock::HybridClock; |
| using kudu::consensus::ConsensusStatePB; |
| using kudu::fs::BlockManager; |
| using kudu::fs::CreateCorruptBlock; |
| using kudu::fs::DataDirManager; |
| using kudu::pb_util::SecureDebugString; |
| using kudu::pb_util::SecureShortDebugString; |
| using kudu::rpc::Messenger; |
| using kudu::rpc::MessengerBuilder; |
| using kudu::rpc::RpcController; |
| using kudu::tablet::LocalTabletWriter; |
| using kudu::tablet::RowSetDataPB; |
| using kudu::tablet::Tablet; |
| using kudu::tablet::TabletReplica; |
| using kudu::tablet::TabletStatePB; |
| using kudu::tablet::TabletSuperBlockPB; |
| using std::endl; |
| using std::make_shared; |
| using std::map; |
| using std::nullopt; |
| using std::pair; |
| using std::ostringstream; |
| using std::set; |
| using std::shared_ptr; |
| using std::string; |
| using std::thread; |
| using std::unique_ptr; |
| using std::unordered_set; |
| using std::vector; |
| using strings::Substitute; |
| |
| DEFINE_int32(single_threaded_insert_latency_bench_warmup_rows, 100, |
| "Number of rows to insert in the warmup phase of the single threaded" |
| " tablet server insert latency micro-benchmark"); |
| |
| DEFINE_int32(single_threaded_insert_latency_bench_insert_rows, 1000, |
| "Number of rows to insert in the testing phase of the single threaded" |
| " tablet server insert latency micro-benchmark"); |
| |
| DEFINE_int32(delete_tablet_bench_num_flushes, 200, |
| "Number of disk row sets to flush in the delete tablet benchmark"); |
| |
| DEFINE_int32(num_tablets, 5, "Number of tablets to create for tests"); |
| DEFINE_int32(num_rowsets_per_tablet, 10, "Number of rowsets to create per tablet"); |
| |
| DECLARE_bool(crash_on_eio); |
| DECLARE_bool(enable_flush_deltamemstores); |
| DECLARE_bool(enable_flush_memrowset); |
| DECLARE_bool(enable_maintenance_manager); |
| DECLARE_bool(enable_rowset_compaction); |
| DECLARE_bool(enable_workload_score_for_perf_improvement_ops); |
| DECLARE_bool(fail_dns_resolution); |
| DECLARE_bool(rowset_metadata_store_keys); |
| DECLARE_bool(scanner_unregister_on_invalid_seq_id); |
| DECLARE_double(cfile_inject_corruption); |
| DECLARE_double(env_inject_eio); |
| DECLARE_double(env_inject_full); |
| DECLARE_double(tablet_inject_kudu_2233); |
| DECLARE_double(workload_score_upper_bound); |
| DECLARE_int32(flush_threshold_mb); |
| DECLARE_int32(flush_threshold_secs); |
| DECLARE_int32(flush_upper_bound_ms); |
| DECLARE_int32(fs_data_dirs_available_space_cache_seconds); |
| DECLARE_int32(fs_target_data_dirs_per_tablet); |
| DECLARE_int32(maintenance_manager_inject_latency_ms); |
| DECLARE_int32(maintenance_manager_num_threads); |
| DECLARE_int32(maintenance_manager_polling_interval_ms); |
| DECLARE_int32(memory_pressure_percentage); |
| DECLARE_int32(metrics_retirement_age_ms); |
| DECLARE_int32(rpc_service_queue_length); |
| DECLARE_int32(scanner_batch_size_rows); |
| DECLARE_int32(scanner_gc_check_interval_us); |
| DECLARE_int32(scanner_ttl_ms); |
| DECLARE_int32(tablet_bootstrap_inject_latency_ms); |
| DECLARE_int32(tablet_inject_latency_on_apply_write_op_ms); |
| DECLARE_int32(workload_stats_rate_collection_min_interval_ms); |
| DECLARE_int32(workload_stats_metric_collection_interval_ms); |
| DECLARE_string(block_manager); |
| DECLARE_string(env_inject_eio_globs); |
| DECLARE_string(env_inject_full_globs); |
| DECLARE_string(webserver_doc_root); |
| DECLARE_uint32(tablet_apply_pool_overload_threshold_ms); |
| |
| // Declare these metrics prototypes for simpler unit testing of their behavior. |
| METRIC_DECLARE_counter(block_manager_total_bytes_read); |
| METRIC_DECLARE_counter(log_block_manager_holes_punched); |
| METRIC_DECLARE_counter(rows_inserted); |
| METRIC_DECLARE_counter(rows_updated); |
| METRIC_DECLARE_counter(rows_deleted); |
| METRIC_DECLARE_counter(rpcs_queue_overflow); |
| METRIC_DECLARE_counter(rpcs_timed_out_in_queue); |
| METRIC_DECLARE_counter(scanners_expired); |
| METRIC_DECLARE_gauge_int32(startup_progress_steps_remaining); |
| METRIC_DECLARE_gauge_int64(startup_progress_time_elapsed); |
| METRIC_DECLARE_gauge_uint64(log_block_manager_blocks_under_management); |
| METRIC_DECLARE_gauge_uint64(log_block_manager_containers); |
| METRIC_DECLARE_gauge_size(active_scanners); |
| METRIC_DECLARE_gauge_size(tablet_active_scanners); |
| METRIC_DECLARE_gauge_size(num_rowsets_on_disk); |
| METRIC_DECLARE_histogram(flush_dms_duration); |
| METRIC_DECLARE_histogram(op_apply_queue_length); |
| METRIC_DECLARE_histogram(op_apply_queue_time); |
| |
| |
| namespace kudu { |
| |
| namespace tablet { |
| class RowSet; |
| } |
| |
| namespace tserver { |
| |
| class TabletServerTest : public TabletServerTestBase { |
| public: |
| // Starts the tablet server, override to start it later. |
| virtual void SetUp() OVERRIDE { |
| NO_FATALS(TabletServerTestBase::SetUp()); |
| NO_FATALS(StartTabletServer(/*num_data_dirs=*/1)); |
| } |
| |
| void DoOrderedScanTest(const Schema& projection, const string& expected_rows_as_string); |
| |
| void ScanYourWritesTest(uint64_t propagated_timestamp, ScanResponsePB* resp); |
| }; |
| |
| TEST_F(TabletServerTest, TestPingServer) { |
| // Ping the server. |
| PingRequestPB req; |
| PingResponsePB resp; |
| RpcController controller; |
| ASSERT_OK(proxy_->Ping(req, &resp, &controller)); |
| } |
| |
| TEST_F(TabletServerTest, TestStatus) { |
| // Get the server's status. |
| server::GetStatusRequestPB req; |
| server::GetStatusResponsePB resp; |
| RpcController controller; |
| ASSERT_OK(generic_proxy_->GetStatus(req, &resp, &controller)); |
| ASSERT_TRUE(resp.has_status()); |
| ASSERT_TRUE(resp.status().has_node_instance()); |
| ASSERT_EQ(mini_server_->uuid(), resp.status().node_instance().permanent_uuid()); |
| |
| // Regression test for KUDU-2148: try to get the status as the server is |
| // starting. To surface this more frequently, we restart the server a number |
| // of times. |
| CountDownLatch latch(1); |
| thread status_thread([&](){ |
| server::GetStatusRequestPB req; |
| server::GetStatusResponsePB resp; |
| RpcController controller; |
| while (latch.count() > 0) { |
| controller.Reset(); |
| resp.Clear(); |
| Status s = generic_proxy_->GetStatus(req, &resp, &controller); |
| if (s.ok()) { |
| // These two fields are guaranteed even if the request yielded an error. |
| CHECK(resp.has_status()); |
| CHECK(resp.status().has_node_instance()); |
| if (resp.has_error()) { |
| // But this one isn't set if the request yielded an error. |
| CHECK(!resp.status().has_version_info()); |
| } |
| } |
| } |
| }); |
| SCOPED_CLEANUP({ |
| latch.CountDown(); |
| status_thread.join(); |
| }); |
| |
| // Can't safely restart unless we allow the replica to be destroyed. |
| tablet_replica_.reset(); |
| |
| for (int i = 0; i < (AllowSlowTests() ? 100 : 10); i++) { |
| mini_server_->Shutdown(); |
| ASSERT_OK(mini_server_->Restart()); |
| } |
| } |
| |
| TEST_F(TabletServerTest, TestServerClock) { |
| server::ServerClockRequestPB req; |
| server::ServerClockResponsePB resp; |
| RpcController controller; |
| |
| ASSERT_OK(generic_proxy_->ServerClock(req, &resp, &controller)); |
| ASSERT_GT(mini_server_->server()->clock()->Now().ToUint64(), resp.timestamp()); |
| } |
| |
| TEST_F(TabletServerTest, TestGetFlags) { |
| server::GenericServiceProxy proxy( |
| client_messenger_, mini_server_->bound_rpc_addr(), |
| mini_server_->bound_rpc_addr().host()); |
| |
| server::GetFlagsRequestPB req; |
| server::GetFlagsResponsePB resp; |
| |
| // Check that a default request returns flags set to a non-default value and |
| // does not return flags set to a default value. |
| // Throughout, we make the reasonable assumption that the -fs_wal_dir flag |
| // will have a non-default value, and the -help and unsafe -logemaillevel |
| // flags will have default values. |
| { |
| RpcController controller; |
| ASSERT_OK(proxy.GetFlags(req, &resp, &controller)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| EXPECT_TRUE(std::any_of(resp.flags().begin(), resp.flags().end(), |
| [](const server::GetFlagsResponsePB::Flag& flag) { |
| return flag.name() == "log_dir"; |
| })); |
| EXPECT_TRUE(std::none_of(resp.flags().begin(), resp.flags().end(), |
| [](const server::GetFlagsResponsePB::Flag& flag) { |
| return flag.name() == "help"; |
| })); |
| } |
| |
| // Check that specifying all flags returns even flags with default values. |
| { |
| RpcController controller; |
| req.set_all_flags(true); |
| ASSERT_OK(proxy.GetFlags(req, &resp, &controller)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| EXPECT_TRUE(std::any_of(resp.flags().begin(), resp.flags().end(), |
| [](const server::GetFlagsResponsePB::Flag& flag) { |
| return flag.name() == "log_dir"; |
| })); |
| EXPECT_TRUE(std::any_of(resp.flags().begin(), resp.flags().end(), |
| [](const server::GetFlagsResponsePB::Flag& flag) { |
| return flag.name() == "help"; |
| })); |
| EXPECT_TRUE(std::any_of(resp.flags().begin(), resp.flags().end(), |
| [](const server::GetFlagsResponsePB::Flag& flag) { |
| return flag.name() == "logemaillevel"; |
| })); |
| } |
| |
| // Check that filtering on tags excludes flags with no matching tag. |
| { |
| RpcController controller; |
| req.add_tags("stable"); |
| ASSERT_OK(proxy.GetFlags(req, &resp, &controller)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| EXPECT_TRUE(std::any_of(resp.flags().begin(), resp.flags().end(), |
| [](const server::GetFlagsResponsePB::Flag& flag) { |
| return flag.name() == "log_dir"; |
| })); |
| EXPECT_TRUE(std::any_of(resp.flags().begin(), resp.flags().end(), |
| [](const server::GetFlagsResponsePB::Flag& flag) { |
| return flag.name() == "help"; |
| })); |
| EXPECT_TRUE(std::none_of(resp.flags().begin(), resp.flags().end(), |
| [](const server::GetFlagsResponsePB::Flag& flag) { |
| return flag.name() == "logemaillevel"; |
| })); |
| } |
| |
| // Check that we get flags with -flags. |
| { |
| RpcController controller; |
| req.Clear(); |
| req.add_flags("log_dir"); |
| req.add_flags("logemaillevel"); |
| ASSERT_OK(proxy.GetFlags(req, &resp, &controller)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| EXPECT_TRUE(std::any_of(resp.flags().begin(), resp.flags().end(), |
| [](const server::GetFlagsResponsePB::Flag& flag) { |
| return flag.name() == "log_dir"; |
| })); |
| EXPECT_TRUE(std::none_of(resp.flags().begin(), resp.flags().end(), |
| [](const server::GetFlagsResponsePB::Flag& flag) { |
| return flag.name() == "help"; |
| })); |
| EXPECT_TRUE(std::any_of(resp.flags().begin(), resp.flags().end(), |
| [](const server::GetFlagsResponsePB::Flag& flag) { |
| return flag.name() == "logemaillevel"; |
| })); |
| } |
| |
| // Check -flags will ignore -all_flags. |
| { |
| RpcController controller; |
| req.Clear(); |
| req.set_all_flags(true); |
| req.add_flags("logemaillevel"); |
| ASSERT_OK(proxy.GetFlags(req, &resp, &controller)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| EXPECT_TRUE(std::none_of(resp.flags().begin(), resp.flags().end(), |
| [](const server::GetFlagsResponsePB::Flag& flag) { |
| return flag.name() == "log_dir"; |
| })); |
| EXPECT_TRUE(std::none_of(resp.flags().begin(), resp.flags().end(), |
| [](const server::GetFlagsResponsePB::Flag& flag) { |
| return flag.name() == "help"; |
| })); |
| EXPECT_TRUE(std::any_of(resp.flags().begin(), resp.flags().end(), |
| [](const server::GetFlagsResponsePB::Flag& flag) { |
| return flag.name() == "logemaillevel"; |
| })); |
| } |
| |
| // Check -flag_tags filter to matching tags with -flags. |
| { |
| RpcController controller; |
| req.Clear(); |
| req.add_flags("logemaillevel"); |
| req.add_tags("stable"); |
| ASSERT_OK(proxy.GetFlags(req, &resp, &controller)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| EXPECT_TRUE(std::none_of(resp.flags().begin(), resp.flags().end(), |
| [](const server::GetFlagsResponsePB::Flag& flag) { |
| return flag.name() == "log_dir"; |
| })); |
| EXPECT_TRUE(std::none_of(resp.flags().begin(), resp.flags().end(), |
| [](const server::GetFlagsResponsePB::Flag& flag) { |
| return flag.name() == "help"; |
| })); |
| EXPECT_TRUE(std::none_of(resp.flags().begin(), resp.flags().end(), |
| [](const server::GetFlagsResponsePB::Flag& flag) { |
| return flag.name() == "logemaillevel"; |
| })); |
| } |
| } |
| |
| TEST_F(TabletServerTest, TestSetFlags) { |
| server::GenericServiceProxy proxy( |
| client_messenger_, mini_server_->bound_rpc_addr(), |
| mini_server_->bound_rpc_addr().host()); |
| |
| server::SetFlagRequestPB req; |
| server::SetFlagResponsePB resp; |
| |
| // Set an invalid flag. |
| { |
| RpcController controller; |
| req.set_flag("foo"); |
| req.set_value("bar"); |
| ASSERT_OK(proxy.SetFlag(req, &resp, &controller)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| EXPECT_EQ(server::SetFlagResponsePB::NO_SUCH_FLAG, resp.result()); |
| EXPECT_TRUE(resp.msg().empty()); |
| } |
| |
| // Set a valid flag to a valid value. |
| { |
| int32_t old_val = FLAGS_metrics_retirement_age_ms; |
| RpcController controller; |
| req.set_flag("metrics_retirement_age_ms"); |
| req.set_value("12345"); |
| ASSERT_OK(proxy.SetFlag(req, &resp, &controller)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| EXPECT_EQ(server::SetFlagResponsePB::SUCCESS, resp.result()); |
| EXPECT_EQ(resp.msg(), "metrics_retirement_age_ms set to 12345\n"); |
| EXPECT_EQ(Substitute("$0", old_val), resp.old_value()); |
| EXPECT_EQ(12345, FLAGS_metrics_retirement_age_ms); |
| } |
| |
| // Set a valid flag to an invalid value. |
| { |
| RpcController controller; |
| req.set_flag("metrics_retirement_age_ms"); |
| req.set_value("foo"); |
| ASSERT_OK(proxy.SetFlag(req, &resp, &controller)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| EXPECT_EQ(server::SetFlagResponsePB::BAD_VALUE, resp.result()); |
| EXPECT_EQ(resp.msg(), "Unable to set flag: bad value"); |
| EXPECT_EQ(12345, FLAGS_metrics_retirement_age_ms); |
| } |
| |
| // Try setting a flag which isn't runtime-modifiable |
| { |
| RpcController controller; |
| req.set_flag("tablet_bloom_target_fp_rate"); |
| req.set_value("1.0"); |
| ASSERT_OK(proxy.SetFlag(req, &resp, &controller)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| EXPECT_EQ(server::SetFlagResponsePB::NOT_SAFE, resp.result()); |
| } |
| |
| // Try again, but with the force flag. |
| { |
| RpcController controller; |
| req.set_flag("tablet_bloom_target_fp_rate"); |
| req.set_value("1.0"); |
| req.set_force(true); |
| ASSERT_OK(proxy.SetFlag(req, &resp, &controller)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| EXPECT_EQ(server::SetFlagResponsePB::SUCCESS, resp.result()); |
| } |
| } |
| |
| TEST_F(TabletServerTest, TestWebPages) { |
| EasyCurl c; |
| faststring buf; |
| string addr = mini_server_->bound_http_addr().ToString(); |
| |
| // Tablets page should list tablet. |
| ASSERT_OK(c.FetchURL(Substitute("http://$0/tablets", addr), |
| &buf)); |
| ASSERT_STR_CONTAINS(buf.ToString(), kTabletId); |
| ASSERT_STR_CONTAINS(buf.ToString(), "RANGE (key) PARTITION UNBOUNDED"); |
| |
| // Tablet page should include the schema. |
| ASSERT_OK(c.FetchURL(Substitute("http://$0/tablet?id=$1", addr, kTabletId), |
| &buf)); |
| ASSERT_STR_CONTAINS(buf.ToString(), "key"); |
| ASSERT_STR_CONTAINS(buf.ToString(), "STRING NULLABLE"); |
| |
| // Test fetching metrics. |
| // Fetching metrics has the side effect of retiring metrics, but not in a single pass. |
| // So, we check a couple of times in a loop -- thus, if we had a bug where one of these |
| // metrics was accidentally un-referenced too early, we'd cause it to get retired. |
| // If the metrics survive several passes of fetching, then we are pretty sure they will |
| // stick around properly for the whole lifetime of the server. |
| FLAGS_metrics_retirement_age_ms = 0; |
| for (int i = 0; i < 3; i++) { |
| SCOPED_TRACE(i); |
| ASSERT_OK(c.FetchURL(strings::Substitute("http://$0/jsonmetricz", addr), &buf)); |
| |
| // Check that the tablet entry shows up. |
| ASSERT_STR_CONTAINS(buf.ToString(), "\"type\": \"tablet\""); |
| ASSERT_STR_CONTAINS(buf.ToString(), "\"id\": \"ffffffffffffffffffffffffffffffff\""); |
| ASSERT_STR_CONTAINS(buf.ToString(), "\"partition\": \"RANGE (key) PARTITION UNBOUNDED"); |
| |
| |
| // Check entity attributes. |
| ASSERT_STR_CONTAINS(buf.ToString(), "\"table_name\": \"TestTable\""); |
| ASSERT_STR_CONTAINS(buf.ToString(), "\"table_id\": \"TestTable\""); |
| |
| // Check for the existence of some particular metrics for which we've had early-retirement |
| // bugs in the past. |
| ASSERT_STR_CONTAINS(buf.ToString(), "hybrid_clock_timestamp"); |
| ASSERT_STR_CONTAINS(buf.ToString(), "active_scanners"); |
| ASSERT_STR_CONTAINS(buf.ToString(), "threads_started"); |
| ASSERT_STR_CONTAINS(buf.ToString(), "code_cache_queries"); |
| #ifdef TCMALLOC_ENABLED |
| ASSERT_STR_CONTAINS(buf.ToString(), "tcmalloc_max_total_thread_cache_bytes"); |
| #endif |
| ASSERT_STR_CONTAINS(buf.ToString(), "glog_info_messages"); |
| } |
| |
| // Smoke-test the tracing infrastructure. |
| ASSERT_OK(c.FetchURL( |
| Substitute("http://$0/tracing/json/get_buffer_percent_full", addr, kTabletId), |
| &buf)); |
| ASSERT_EQ(buf.ToString(), "0"); |
| |
| string enable_req_json = "{\"categoryFilter\":\"*\", \"useContinuousTracing\": \"true\"," |
| " \"useSampling\": \"false\"}"; |
| string req_b64; |
| strings::Base64Escape(enable_req_json, &req_b64); |
| |
| for (bool compressed : {false, true}) { |
| ASSERT_OK(c.FetchURL(Substitute("http://$0/tracing/json/begin_recording?$1", |
| addr, |
| req_b64), &buf)); |
| ASSERT_EQ(buf.ToString(), ""); |
| ASSERT_OK(c.FetchURL(Substitute("http://$0/tracing/json/end_recording$1", addr, |
| compressed ? "_compressed" : ""), |
| &buf)); |
| string json; |
| if (compressed) { |
| std::ostringstream ss; |
| ASSERT_OK(zlib::Uncompress(buf, &ss)); |
| json = ss.str(); |
| } else { |
| json = buf.ToString(); |
| } |
| |
| ASSERT_STR_CONTAINS(json, "__metadata"); |
| } |
| |
| ASSERT_OK(c.FetchURL(Substitute("http://$0/tracing/json/categories", addr), |
| &buf)); |
| ASSERT_STR_CONTAINS(buf.ToString(), "\"log\""); |
| |
| // Smoke test the pprof contention profiler handler. |
| ASSERT_OK(c.FetchURL(Substitute("http://$0/pprof/contention?seconds=1", addr), |
| &buf)); |
| ASSERT_STR_CONTAINS(buf.ToString(), "discarded samples = 0"); |
| #if defined(__linux__) |
| // The executable name appears as part of the dump of /proc/self/maps, which |
| // only exists on Linux. |
| ASSERT_STR_CONTAINS(buf.ToString(), "tablet_server-test"); |
| #endif |
| } |
| |
| // Ensure that when a replica is in a failed / shutdown state, it returns an |
| // error for ConsensusState() requests. |
| TEST_F(TabletServerTest, TestFailedTabletsRejectConsensusState) { |
| scoped_refptr<TabletReplica> replica; |
| TSTabletManager* tablet_manager = mini_server_->server()->tablet_manager(); |
| ASSERT_TRUE(tablet_manager->LookupTablet(kTabletId, &replica)); |
| replica->SetError(Status::IOError("This error will leave the replica FAILED state at shutdown")); |
| replica->Shutdown(); |
| ASSERT_EQ(tablet::FAILED, replica->state()); |
| |
| auto consensus = replica->shared_consensus(); |
| ASSERT_TRUE(consensus); |
| ConsensusStatePB cstate; |
| Status s = consensus->ConsensusState(&cstate); |
| ASSERT_TRUE(s.IsIllegalState()) << s.ToString(); |
| ASSERT_STR_CONTAINS(s.ToString(), "Tablet replica is shutdown"); |
| } |
| |
| // Test that tablet replicas that get failed and deleted will eventually show |
| // up as failed tombstones on the web UI. |
| TEST_F(TabletServerTest, TestFailedTabletsOnWebUI) { |
| scoped_refptr<TabletReplica> replica; |
| TSTabletManager* tablet_manager = mini_server_->server()->tablet_manager(); |
| ASSERT_TRUE(tablet_manager->LookupTablet(kTabletId, &replica)); |
| replica->SetError(Status::IOError("This error will leave the replica FAILED state at shutdown")); |
| replica->Shutdown(); |
| ASSERT_EQ(tablet::FAILED, replica->state()); |
| |
| // Now delete the replica and leave it tombstoned, e.g. as if the failed |
| // replica were deleted. |
| TabletServerErrorPB::Code error_code; |
| ASSERT_OK(tablet_manager->DeleteTablet(kTabletId, |
| tablet::TABLET_DATA_TOMBSTONED, nullopt, &error_code)); |
| |
| EasyCurl c; |
| faststring buf; |
| const string addr = mini_server_->bound_http_addr().ToString(); |
| ASSERT_OK(c.FetchURL(Substitute("http://$0/tablets", addr), &buf)); |
| |
| // The webui should have a record of a FAILED and tombstoned tablet replica. |
| ASSERT_STR_CONTAINS(buf.ToString(), "FAILED (TABLET_DATA_TOMBSTONED)"); |
| } |
| |
| // Test that tombstoned tablets are displayed correctly in the web ui: |
| // - After restart, status message of "Tombstoned" instead of "Tablet initializing...". |
| // - No consensus configuration. |
| TEST_F(TabletServerTest, TestTombstonedTabletOnWebUI) { |
| TSTabletManager* tablet_manager = mini_server_->server()->tablet_manager(); |
| TabletServerErrorPB::Code error_code; |
| ASSERT_OK( |
| tablet_manager->DeleteTablet(kTabletId, |
| tablet::TABLET_DATA_TOMBSTONED, |
| nullopt, |
| &error_code)); |
| |
| // Restart the server. We drop the tablet_replica_ reference since it becomes |
| // invalid when the server shuts down. |
| tablet_replica_.reset(); |
| mini_server_->Shutdown(); |
| ASSERT_OK(mini_server_->Restart()); |
| ASSERT_OK(mini_server_->WaitStarted()); |
| |
| EasyCurl c; |
| faststring buf; |
| const string addr = mini_server_->bound_http_addr().ToString(); |
| ASSERT_OK(c.FetchURL(Substitute("http://$0/tablets", addr), &buf)); |
| |
| // Check the page contains a tombstoned tablet, and its state is not |
| // "Tablet initializing...". |
| string s = buf.ToString(); |
| ASSERT_STR_CONTAINS(s, "TABLET_DATA_TOMBSTONED"); |
| ASSERT_STR_NOT_CONTAINS(s, "Tablet initializing..."); |
| |
| // Since the consensus config shouldn't be displayed, the page should not |
| // contain the server's RPC address. |
| ASSERT_STR_NOT_CONTAINS(s, mini_server_->bound_rpc_addr().ToString()); |
| } |
| |
| // When tablet server merge metrics by the same attributes, the metric |
| // 'merged_entities_count_of_tablet' should be visible |
| TEST_F(TabletServerTest, TestMergedEntitiesCount) { |
| EasyCurl c; |
| faststring buf; |
| const string addr = mini_server_->bound_http_addr().ToString(); |
| ASSERT_OK(c.FetchURL(Substitute("http://$0/metrics", addr), &buf)); |
| ASSERT_STR_NOT_CONTAINS(buf.ToString(), "merged_entities_count_of_tablet"); |
| ASSERT_OK(c.FetchURL(Substitute("http://$0/metrics?merge_rules=tablet|table|table_name", addr), |
| &buf)); |
| ASSERT_STR_CONTAINS(buf.ToString(), "merged_entities_count_of_tablet"); |
| } |
| |
| class TabletServerDiskSpaceTest : public TabletServerTestBase, |
| public testing::WithParamInterface<string> { |
| public: |
| void SetUp() override { |
| FLAGS_block_manager = GetParam(); |
| NO_FATALS(TabletServerTestBase::SetUp()); |
| NO_FATALS(StartTabletServer(/*num_data_dirs=*/kNumDirs)); |
| } |
| protected: |
| const int kNumDirs = FLAGS_fs_target_data_dirs_per_tablet + 1; |
| }; |
| |
| // Test that when there isn't enough space in a tablet's data directory group |
| // and there are additional directories available, directories are added to the |
| // group, and the new groups are persisted to disk. |
| TEST_P(TabletServerDiskSpaceTest, TestFullGroupAddsDir) { |
| DataDirManager* dd_manager = mini_server_->server()->fs_manager()->dd_manager(); |
| vector<string> dir_group; |
| ASSERT_OK(dd_manager->FindDataDirsByTabletId(kTabletId, &dir_group)); |
| ASSERT_EQ(kNumDirs - 1, dir_group.size()); |
| FLAGS_fs_data_dirs_available_space_cache_seconds = 0; |
| FLAGS_env_inject_full_globs = JoinStrings(dir_group, ","); |
| FLAGS_env_inject_full = 1.0; |
| |
| // Insert some data and flush. This should lead to the creation of a block, |
| // and the addition of a new directory in the dir group. |
| unordered_set<string> old_group(dir_group.begin(), dir_group.end()); |
| NO_FATALS(InsertTestRowsRemote(1, 1)); |
| ASSERT_OK(tablet_replica_->tablet()->Flush()); |
| ASSERT_OK(dd_manager->FindDataDirsByTabletId(kTabletId, &dir_group)); |
| ASSERT_EQ(kNumDirs, dir_group.size()); |
| |
| // Grab the newly added directory and check that failing it means the tablet |
| // is in a failed directory. |
| string new_dir; |
| for (const auto& dir : dir_group) { |
| if (!ContainsKey(old_group, dir)) { |
| new_dir = dir; |
| break; |
| } |
| } |
| ASSERT_FALSE(new_dir.empty()); |
| string new_uuid; |
| ASSERT_TRUE(dd_manager->FindUuidByRoot(DirName(new_dir), &new_uuid)); |
| dd_manager->MarkDirFailedByUuid(new_uuid); |
| ASSERT_TRUE(dd_manager->IsTabletInFailedDir(kTabletId)); |
| |
| // The group should be the updated even after restarting the tablet server. |
| NO_FATALS(ShutdownAndRebuildTablet(kNumDirs)); |
| dd_manager = mini_server_->server()->fs_manager()->dd_manager(); |
| ASSERT_OK(dd_manager->FindDataDirsByTabletId(kTabletId, &dir_group)); |
| ASSERT_EQ(kNumDirs, dir_group.size()); |
| ASSERT_TRUE(dd_manager->FindUuidByRoot(DirName(new_dir), &new_uuid)); |
| dd_manager->MarkDirFailedByUuid(new_uuid); |
| ASSERT_TRUE(dd_manager->IsTabletInFailedDir(kTabletId)); |
| } |
| |
| INSTANTIATE_TEST_SUITE_P(BlockManager, TabletServerDiskSpaceTest, |
| ::testing::ValuesIn(BlockManager::block_manager_types())); |
| |
| enum class ErrorType { |
| DISK_FAILURE, |
| CFILE_CORRUPTION, |
| KUDU_2233_CORRUPTION, |
| }; |
| |
| class TabletServerStartupWebPageTest : public TabletServerTestBase { |
| public: |
| void SetUp() override { |
| // The embedded webserver renders the contents of the generated pages |
| // according to mustache's mappings found under the directory pointed to by |
| // the --webserver_doc_root flag, which is set to $KUDU_HOME/www by default. |
| // Since this test assumes to fetch the pre-rendered output for the startup |
| // page, it would fail if the KUDU_HOME environment variable were set and |
| // pointed to the location where 'www' subdirectory contained the required |
| // mustache mappings. Let's explicitly point the document root to nowhere, |
| // so no mustache-based rendering is done. |
| FLAGS_webserver_doc_root = ""; |
| |
| NO_FATALS(TabletServerTestBase::SetUp()); |
| NO_FATALS(StartTabletServer(kNumDirs)); |
| // Create a bunch of tablets with a bunch of rowsets. |
| ObjectIdGenerator oid; |
| for (int i = 0; i < FLAGS_num_tablets; i++) { |
| const auto tablet_id = oid.Next(); |
| ASSERT_OK(mini_server_->AddTestTablet(kTableId, tablet_id, schema_)); |
| ASSERT_OK(WaitForTabletRunning(tablet_id.c_str())); |
| for (int r = 0; r < FLAGS_num_rowsets_per_tablet; r++) { |
| NO_FATALS(InsertTestRowsDirect(r, 1, tablet_id)); |
| scoped_refptr<TabletReplica> replica; |
| ASSERT_TRUE(mini_server_->server()->tablet_manager()->LookupTablet(tablet_id, &replica)); |
| ASSERT_OK(replica->tablet()->Flush()); |
| } |
| } |
| } |
| protected: |
| const int kNumDirs = FLAGS_fs_target_data_dirs_per_tablet + 1; |
| |
| void IsStatusValid(const string& startup_page_status) { |
| ASSERT_STR_MATCHES(startup_page_status, "\"init_status\":(100|0)( |,)"); |
| ASSERT_STR_MATCHES(startup_page_status, "\"read_filesystem_status\":(100|0)( |,)"); |
| ASSERT_STR_MATCHES(startup_page_status, "\"read_instance_metadatafiles_status\":(100|0)( |,)"); |
| ASSERT_STR_MATCHES(startup_page_status, "\"read_data_directories_status\":" |
| "([0-9]|[1-9][0-9]|100)( |,)"); |
| ASSERT_STR_MATCHES(startup_page_status,"\"start_tablets_status\":([0-9]|[1-9][0-9]|100)"); |
| ASSERT_STR_MATCHES(startup_page_status,"\"start_rpc_server_status\":(100|0)( |,)"); |
| } |
| |
| void IsStatusComplete(const string& startup_page_status) { |
| ASSERT_STR_CONTAINS(startup_page_status, "\"init_status\":100"); |
| ASSERT_STR_CONTAINS(startup_page_status, "\"read_filesystem_status\":100"); |
| ASSERT_STR_CONTAINS(startup_page_status, "\"read_instance_metadatafiles_status\":100"); |
| ASSERT_STR_CONTAINS(startup_page_status, "\"read_data_directories_status\":100"); |
| ASSERT_STR_CONTAINS(startup_page_status, "\"start_tablets_status\":100"); |
| ASSERT_STR_CONTAINS(startup_page_status, "\"start_rpc_server_status\":100"); |
| } |
| |
| void CheckNonZeroMetricIsStable(const string& metric_name) { |
| EasyCurl c; |
| faststring buf; |
| string addr = mini_server_->bound_http_addr().ToString(); |
| const auto url = Substitute("http://$0/metrics?metrics=$1", addr, metric_name); |
| ASSERT_OK(c.FetchURL(url, &buf)); |
| const auto orig_buf = buf.ToString(); |
| ASSERT_STR_NOT_CONTAINS(orig_buf, "\"value\": 0"); |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| ASSERT_OK(c.FetchURL(url, &buf)); |
| ASSERT_EQ(orig_buf, buf.ToString()); |
| } |
| }; |
| |
| TEST_F(TabletServerStartupWebPageTest, TestStartupWebPage) { |
| const string url = Substitute("http://$0/startup", mini_server_->bound_http_addr().ToString()); |
| |
| // Verify if the startup status is complete. |
| mini_server_->WaitStarted(); |
| |
| EasyCurl c; |
| faststring buf; |
| ASSERT_OK(c.FetchURL(url, &buf)); |
| NO_FATALS(IsStatusComplete(buf.ToString())); |
| |
| // Fail a data directory. |
| FsManager* fs_manager = mini_server_->server()->fs_manager(); |
| FLAGS_env_inject_eio_globs = fs_manager->GetDataRootDirs()[1]; |
| FLAGS_env_inject_eio = 1.0; |
| |
| // Restart the tablet server and monitor the startup page contents. |
| tablet_replica_.reset(); |
| mini_server_->Shutdown(); |
| std::atomic<bool> run_status_reader = true; |
| |
| // Hammer the webpage and validate the status percentages. |
| thread status_reader([&] { |
| EasyCurl thread_c; |
| faststring thread_buf; |
| while (run_status_reader) { |
| if (!thread_c.FetchURL(url, &thread_buf).ok()) { |
| continue; |
| } |
| NO_FATALS(IsStatusValid(thread_buf.ToString())); |
| } |
| }); |
| SCOPED_CLEANUP({ |
| run_status_reader = false; |
| status_reader.join(); |
| }); |
| |
| mini_server_->Start(); |
| mini_server_->WaitStarted(); |
| run_status_reader = false; |
| |
| // After the server has startup up, ensure every startup step has 100 percent status. |
| ASSERT_OK(c.FetchURL(url, &buf)); |
| NO_FATALS(IsStatusComplete(buf.ToString())); |
| } |
| |
| TEST_F(TabletServerStartupWebPageTest, TestAggregateStartupMetrics) { |
| EasyCurl c; |
| faststring buf; |
| string addr = mini_server_->bound_http_addr().ToString(); |
| |
| // Verify the startup steps remaining are zero once the server is started up. |
| mini_server_->WaitStarted(); |
| ASSERT_OK(c.FetchURL(Substitute("http://$0/metrics?metrics=startup_progress_steps_remaining", |
| addr), |
| &buf)); |
| string raw = buf.ToString(); |
| ASSERT_STR_MATCHES(raw, "startup_progress_steps_remaining.*\"value\": 0"); |
| |
| // Fetch time elapsed twice and ensure it is constant. |
| NO_FATALS(CheckNonZeroMetricIsStable("startup_progress_time_elapsed")); |
| } |
| |
| |
| TEST_F(TabletServerStartupWebPageTest, TestTabletsMetrics) { |
| EasyCurl c; |
| faststring buf; |
| // Verify if the startup status is complete. |
| mini_server_->WaitStarted(); |
| const string url = Substitute("http://$0/metrics", mini_server_->bound_http_addr().ToString()); |
| // Validate populated metrics in case of zero tablets during startup. |
| ASSERT_OK(c.FetchURL(url, &buf)); |
| string raw = buf.ToString(); |
| ASSERT_STR_MATCHES(raw, "tablets_num_total_startup\",\n[ ]+\"value\": 0"); |
| ASSERT_STR_MATCHES(raw, "tablets_num_opened_startup\",\n[ ]+\"value\": 0"); |
| ASSERT_STR_NOT_CONTAINS(raw, "tablets_opening_time_startup"); |
| |
| tablet_replica_.reset(); |
| mini_server_->Shutdown(); |
| ASSERT_OK(mini_server_->Start()); |
| ASSERT_OK(mini_server_->WaitStarted()); |
| |
| // Validate populated metrics in case of non-zero tablets during startup. |
| ASSERT_OK(c.FetchURL(url, &buf)); |
| raw = buf.ToString(); |
| ASSERT_STR_MATCHES(raw, Substitute("tablets_num_total_startup\",\n[ ]+\"value\": $0", |
| FLAGS_num_tablets + 1)); |
| ASSERT_STR_MATCHES(raw, Substitute("tablets_num_opened_startup\",\n[ ]+\"value\": $0", |
| FLAGS_num_tablets + 1)); |
| |
| // Fetch time taken twice and ensure it is constant. |
| NO_FATALS(CheckNonZeroMetricIsStable("tablets_opening_time_startup")); |
| } |
| |
| TEST_F(TabletServerStartupWebPageTest, TestLogBlockContainerMetrics) { |
| |
| EasyCurl c; |
| faststring buf; |
| mini_server_->WaitStarted(); |
| string addr = mini_server_->bound_http_addr().ToString(); |
| // Validate populated metrics in case of zero containers during startup. |
| ASSERT_OK(c.FetchURL(Substitute("http://$0/metrics", addr), &buf)); |
| string raw = buf.ToString(); |
| if (mini_server_->options()->fs_opts.block_manager_type == "log") { |
| ASSERT_STR_MATCHES(raw, "log_block_manager_total_containers_startup\",\n[ ]+\"value\": 0"); |
| ASSERT_STR_MATCHES(raw, "log_block_manager_processed_containers_startup\",\n[ ]+\"value\": 0"); |
| // Since we open each directory and read all the contents, the time taken might not be |
| // 0 in case of new servers. So we just check if the time is constant at this point. |
| ASSERT_OK(c.FetchURL( |
| Substitute("http://$0/metrics?metrics=log_block_manager_containers_processing_time_startup", |
| addr),&buf)); |
| string time_elapsed = buf.ToString(); |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| ASSERT_OK(c.FetchURL( |
| Substitute("http://$0/metrics?metrics=log_block_manager_containers_processing_time_startup", |
| addr),&buf)); |
| ASSERT_EQ(time_elapsed, buf.ToString()); |
| // Create containers, get the count of containers present and shutdown the server. |
| for (int i = 0; i < FLAGS_delete_tablet_bench_num_flushes; i++) { |
| NO_FATALS(InsertTestRowsRemote(i, 1)); |
| ASSERT_OK(tablet_replica_->tablet()->Flush()); |
| } |
| scoped_refptr<AtomicGauge<uint64_t>> container = |
| METRIC_log_block_manager_containers.Instantiate( |
| mini_server_->server()->metric_entity(), 0); |
| tablet_replica_.reset(); |
| mini_server_->Shutdown(); |
| ASSERT_OK(mini_server_->Start()); |
| ASSERT_OK(mini_server_->WaitStarted()); |
| |
| // Validate populated metrics in case of non-zero containers during startup. |
| ASSERT_OK(c.FetchURL(Substitute("http://$0/metrics", addr), &buf)); |
| raw = buf.ToString(); |
| ASSERT_STR_MATCHES(raw, |
| Substitute("log_block_manager_total_containers_startup\",\n[ ]+\"value\": $0", |
| container->value())); |
| ASSERT_STR_MATCHES(raw, |
| Substitute("log_block_manager_processed_containers_startup\",\n[ ]+\"value\": $0", |
| container->value())); |
| |
| // Fetch time taken twice and ensure it is constant. |
| NO_FATALS(CheckNonZeroMetricIsStable("log_block_manager_containers_processing_time_startup")); |
| } else { |
| ASSERT_STR_NOT_CONTAINS(raw, "log_block_manager_total_containers_startup"); |
| ASSERT_STR_NOT_CONTAINS(raw, "log_block_manager_processed_containers_startup"); |
| ASSERT_STR_NOT_CONTAINS(raw, "log_block_manager_containers_processing_time_startup"); |
| } |
| } |
| |
| |
| class TabletServerDiskErrorTest : public TabletServerTestBase, |
| public testing::WithParamInterface<ErrorType> { |
| public: |
| virtual void SetUp() override { |
| const int kNumDirs = 5; |
| NO_FATALS(TabletServerTestBase::SetUp()); |
| // Ensure the server will flush frequently. |
| FLAGS_enable_maintenance_manager = true; |
| FLAGS_maintenance_manager_num_threads = kNumDirs; |
| FLAGS_flush_threshold_mb = 1; |
| FLAGS_flush_threshold_secs = 1; |
| |
| // Create a brand new tablet server with multiple disks, ensuring it can |
| // survive at least one disk failure. |
| NO_FATALS(StartTabletServer(/*num_data_dirs=*/ kNumDirs)); |
| } |
| }; |
| |
| INSTANTIATE_TEST_SUITE_P(ErrorType, TabletServerDiskErrorTest, ::testing::Values( |
| ErrorType::DISK_FAILURE, ErrorType::CFILE_CORRUPTION, ErrorType::KUDU_2233_CORRUPTION)); |
| |
| // Test that applies random write operations to a tablet with a high |
| // maintenance manager load and a non-zero error injection rate. |
| TEST_P(TabletServerDiskErrorTest, TestRandomOpSequence) { |
| SKIP_IF_SLOW_NOT_ALLOWED(); |
| typedef vector<RowOperationsPB::Type> OpTypeList; |
| const OpTypeList kOpsIfKeyNotPresent = { RowOperationsPB::INSERT, RowOperationsPB::UPSERT }; |
| |
| // If testing KUDU-2233 corruption, reduce the key-space significantly to |
| // make it more likely that rows will have history to merge, and delete more |
| // frequently to ensure a row's history is strewn across multiple rowsets. |
| const bool is_kudu_2233 = GetParam() == ErrorType::KUDU_2233_CORRUPTION; |
| const OpTypeList kOpsIfKeyPresent = is_kudu_2233 ? |
| OpTypeList{ RowOperationsPB::DELETE } : |
| OpTypeList{ RowOperationsPB::UPSERT, RowOperationsPB::UPDATE, RowOperationsPB::DELETE }; |
| const int kMaxKey = is_kudu_2233 ? 10 : 100000; |
| if (is_kudu_2233) { |
| // Failures from KUDU-2233 are only caught during compactions; disable them |
| // for now so we can start them only after we begin injecting errors. |
| FLAGS_enable_rowset_compaction = false; |
| } |
| if (GetParam() == ErrorType::DISK_FAILURE) { |
| // Set these way up-front so we can change a single value to actually start |
| // injecting errors. Inject errors into all data dirs but one. |
| const vector<string> failed_dirs = { mini_server_->options()->fs_opts.data_roots.begin() + 1, |
| mini_server_->options()->fs_opts.data_roots.end() }; |
| FLAGS_env_inject_eio_globs = JoinStrings(JoinPathSegmentsV(failed_dirs, "**"), ","); |
| } |
| |
| set<int> keys; |
| const auto GetRandomString = [&] { |
| return StringPrintf("%d", rand() % kMaxKey); |
| }; |
| |
| // Perform a random op (insert, update, upsert, or delete). |
| const auto PerformOp = [&] { |
| // Set up the request. |
| WriteRequestPB req; |
| req.set_tablet_id(kTabletId); |
| RETURN_NOT_OK(SchemaToPB(schema_, req.mutable_schema())); |
| |
| // Set up the other state. |
| WriteResponsePB resp; |
| RpcController controller; |
| RowOperationsPB::Type op_type; |
| int key = rand() % kMaxKey; |
| if (ContainsKey(keys, key)) { |
| // If the key already exists, update, upsert, or delete it. |
| op_type = kOpsIfKeyPresent[rand() % kOpsIfKeyPresent.size()]; |
| } else { |
| // ... otherwise, insert or upsert. |
| op_type = kOpsIfKeyNotPresent[rand() % kOpsIfKeyNotPresent.size()]; |
| } |
| |
| // Add the op to the request. |
| if (op_type != RowOperationsPB::DELETE) { |
| AddTestRowToPB(op_type, schema_, key, key, GetRandomString(), |
| req.mutable_row_operations()); |
| keys.insert(key); |
| } else { |
| AddTestKeyToPB(RowOperationsPB::DELETE, schema_, key, req.mutable_row_operations()); |
| keys.erase(key); |
| } |
| |
| // Finally, write to the server and log the response. |
| RETURN_NOT_OK_PREPEND(proxy_->Write(req, &resp, &controller), "Failed to write"); |
| LOG(INFO) << "Tablet server responded with: " << SecureDebugString(resp); |
| return resp.has_error() ? StatusFromPB(resp.error().status()) : Status::OK(); |
| }; |
| |
| // Perform some arbitrarily large number of ops, with some pauses to |
| // encourage flushes. |
| for (int i = 0; i < 500; i++) { |
| if (i % 10) { |
| SleepFor(MonoDelta::FromMilliseconds(100)); |
| } |
| ASSERT_OK(PerformOp()); |
| } |
| // At this point, a bunch of operations have gone through successfully. Start |
| // injecting errors. |
| switch (GetParam()) { |
| case ErrorType::DISK_FAILURE: |
| FLAGS_env_inject_eio = 0.01; |
| break; |
| case ErrorType::CFILE_CORRUPTION: |
| FLAGS_cfile_inject_corruption = 0.01; |
| break; |
| case ErrorType::KUDU_2233_CORRUPTION: |
| // KUDU-2233 errors only get triggered with very specific compactions, so |
| // bump the failure rate all the way up. |
| FLAGS_tablet_inject_kudu_2233 = 1.0; |
| FLAGS_enable_rowset_compaction = true; |
| break; |
| } |
| |
| // The tablet will eventually be failed and will not be able to accept |
| // updates. Keep on inserting until that happens. |
| ASSERT_EVENTUALLY([&] { |
| Status s; |
| for (int i = 0; i < 150 && s.ok(); i++) { |
| s = PerformOp(); |
| } |
| ASSERT_FALSE(s.ok()); |
| }); |
| LOG(INFO) << "Failure was caught by an op!"; |
| ASSERT_EVENTUALLY([&] { |
| ASSERT_EQ(tablet::FAILED, tablet_replica_->state()); |
| }); |
| LOG(INFO) << "Tablet was successfully failed"; |
| } |
| |
| // Regression test for KUDU-2635. |
| TEST_F(TabletServerTest, TestEIODuringDelete) { |
| // Delete some blocks, but don't always delete them persistently so we're |
| // left with some orphaned blocks in the orphaned blocks list. We'll do this |
| // by injecting some EIOs. |
| NO_FATALS(InsertTestRowsRemote(1, 1)); |
| ASSERT_OK(tablet_replica_->tablet()->Flush()); |
| NO_FATALS(UpdateTestRowRemote(1, 2)); |
| ASSERT_OK(tablet_replica_->tablet()->FlushAllDMSForTests()); |
| FsManager* fs_manager = mini_server_->server()->fs_manager(); |
| FLAGS_env_inject_eio_globs = JoinPathSegments(fs_manager->GetDataRootDirs()[0], "**"); |
| FLAGS_env_inject_eio = 0.5; |
| ignore_result(tablet_replica_->tablet()->MajorCompactAllDeltaStoresForTests()); |
| |
| // Delete the tablet while still injecting failures. Even if we aren't |
| // successful in deleting our orphaned blocks list, we shouldn't crash. |
| DeleteTabletRequestPB req; |
| DeleteTabletResponsePB resp; |
| req.set_dest_uuid(fs_manager->uuid()); |
| req.set_tablet_id(kTabletId); |
| req.set_delete_type(tablet::TABLET_DATA_DELETED); |
| RpcController rpc; |
| ASSERT_OK(admin_proxy_->DeleteTablet(req, &resp, &rpc)); |
| } |
| |
| // Test that adding a directories enables tablet placement in the new |
| // directories, and that removing directories fails tablets that are striped |
| // across the removed directories. |
| TEST_F(TabletServerTest, TestAddRemoveDirectory) { |
| if (FLAGS_block_manager == "file") { |
| LOG(INFO) << "Skipping test since the file block manager does not support updating directories"; |
| return; |
| } |
| // Start with multiple data dirs so the dirs are suffixed with numbers, and |
| // so when we remove a data dirs, we'll be using the same set of dirs. |
| NO_FATALS(ShutdownAndRebuildTablet(/*num_data_dirs*/2)); |
| const char* kFooTablet1 = "fffffffffffffffffffffffffffffff1"; |
| ASSERT_OK(mini_server_->AddTestTablet("footable", kFooTablet1, schema_)); |
| ASSERT_OK(WaitForTabletRunning(kFooTablet1)); |
| |
| // Shut down and restart with a new directory. This is allowed, and the |
| // tablet server will be able to use the new directory if we create a new |
| // tablet. |
| NO_FATALS(ShutdownAndRebuildTablet(/*num_data_dirs*/3)); |
| const char* kFooTablet2 = "fffffffffffffffffffffffffffffff2"; |
| ASSERT_OK(mini_server_->AddTestTablet("footable", kFooTablet2, schema_)); |
| ASSERT_OK(WaitForTabletRunning(kFooTablet2)); |
| |
| // Now open up again with a our original two directories. The second tablet |
| // should fail because it should have been striped across the third |
| // directory. The first tablet should be unaffected. |
| NO_FATALS(ShutdownAndRebuildTablet(/*num_data_dirs*/2)); |
| ASSERT_EVENTUALLY([&] { |
| scoped_refptr<TabletReplica> replica1; |
| ASSERT_TRUE(mini_server_->server()->tablet_manager()->LookupTablet(kFooTablet1, &replica1)); |
| ASSERT_EQ(TabletStatePB::RUNNING, replica1->state()); |
| scoped_refptr<TabletReplica> replica2; |
| ASSERT_TRUE(mini_server_->server()->tablet_manager()->LookupTablet(kFooTablet2, &replica2)); |
| ASSERT_EQ(TabletStatePB::FAILED, replica2->state()); |
| }); |
| } |
| |
| TEST_F(TabletServerTest, TestFailReplicaOnKUDU2233Corruption) { |
| FLAGS_tablet_inject_kudu_2233 = 1; |
| // Trigger the code paths that crash a server in the case of KUDU-2233 |
| // compactions, i.e. where a compaction needs to merge the history of a |
| // single row across multiple rowsets. |
| NO_FATALS(InsertTestRowsRemote(1, 1)); |
| NO_FATALS(DeleteTestRowsRemote(1, 1)); |
| ASSERT_OK(tablet_replica_->tablet()->Flush()); |
| NO_FATALS(InsertTestRowsRemote(1, 1)); |
| ASSERT_OK(tablet_replica_->tablet()->Flush()); |
| Status s = tablet_replica_->tablet()->Compact(Tablet::FORCE_COMPACT_ALL); |
| ASSERT_TRUE(s.IsCorruption()); |
| ASSERT_EVENTUALLY([&] { |
| ASSERT_EQ(tablet::FAILED, tablet_replica_->state()); |
| }); |
| } |
| |
| class TabletServerMaintenanceMemoryPressureTest : public TabletServerTestBase { |
| public: |
| void SetUp() override { |
| NO_FATALS(TabletServerTestBase::SetUp()); |
| FLAGS_enable_maintenance_manager = true; |
| FLAGS_flush_threshold_secs = 1; |
| FLAGS_memory_pressure_percentage = 0; |
| // For the sake of easier setup, slow down our maintenance polling interval. |
| FLAGS_maintenance_manager_polling_interval_ms = 1000; |
| |
| // While setting up rowsets, disable compactions and flushing. Do this |
| // before doing anything so we can have tighter control over the flushing |
| // of our rowsets. |
| FLAGS_enable_rowset_compaction = false; |
| FLAGS_enable_flush_deltamemstores = false; |
| FLAGS_enable_flush_memrowset = false; |
| NO_FATALS(StartTabletServer(/*num_data_dirs=*/1)); |
| } |
| }; |
| |
| // Regression test for KUDU-3002. Previously, when under memory pressure, we |
| // might starve older (usually small) DMS flushes in favor of (usually larger) |
| // MRS flushes. |
| TEST_F(TabletServerMaintenanceMemoryPressureTest, TestDontStarveDMSWhileUnderMemoryPressure) { |
| // First, set up a rowset with a delta. |
| NO_FATALS(InsertTestRowsDirect(1, 1)); |
| ASSERT_OK(tablet_replica_->tablet()->Flush()); |
| NO_FATALS(UpdateTestRowRemote(1, 2)); |
| |
| // Roll onto a new log segment so our DMS anchors some WAL bytes. |
| ASSERT_OK(tablet_replica_->log()->WaitUntilAllFlushed()); |
| ASSERT_OK(tablet_replica_->log()->AllocateSegmentAndRollOverForTests()); |
| |
| // Now start inserting to the tablet so every time we pick a maintenance op, |
| // we'll have a sizeable MRS. |
| std::atomic<bool> keep_inserting(true); |
| thread insert_thread([&] { |
| int cur_row = 2; |
| while (keep_inserting) { |
| NO_FATALS(InsertTestRowsDirect(cur_row++, 1)); |
| } |
| }); |
| SCOPED_CLEANUP({ |
| keep_inserting = false; |
| insert_thread.join(); |
| }); |
| |
| // Wait a bit for the MRS to build up and then enable flushing. |
| SleepFor(MonoDelta::FromSeconds(1)); |
| FLAGS_enable_flush_memrowset = true; |
| FLAGS_enable_flush_deltamemstores = true; |
| |
| // Despite always having a large MRS, we should eventually flush the DMS, |
| // since it anchors WALs. |
| scoped_refptr<Histogram> dms_flushes = |
| METRIC_flush_dms_duration.Instantiate(tablet_replica_->tablet()->GetMetricEntity()); |
| // NOTE: we don't use ASSERT_EVENTUALLY because gtest may race with the |
| // NO_FATALS call in the inserter thread. |
| constexpr int kTimeoutSecs = 30; |
| const MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(kTimeoutSecs); |
| while (dms_flushes->histogram()->TotalCount() < 1) { |
| if (MonoTime::Now() > deadline) { |
| FAIL() << Substitute("Didn't flush DMS in $0 seconds", kTimeoutSecs); |
| } |
| SleepFor(MonoDelta::FromMilliseconds(100)); |
| } |
| } |
| |
| // Regression test for KUDU-2929. Previously, when under memory pressure, we |
| // would never compact, even if there were something else to do. We'll simulate |
| // this by flushing some overlapping rowsets and then making sure we compact. |
| TEST_F(TabletServerMaintenanceMemoryPressureTest, TestCompactWhileUnderMemoryPressure) { |
| // Insert sets of overlapping rows. |
| // Since we're under memory pressure, we'll flush as soon as we're able. |
| NO_FATALS(InsertTestRowsDirect(1, 1)); |
| NO_FATALS(InsertTestRowsDirect(3, 1)); |
| FLAGS_enable_flush_memrowset = true; |
| ASSERT_EVENTUALLY([&] { |
| ASSERT_EQ(1, tablet_replica_->tablet()->num_rowsets()); |
| }); |
| NO_FATALS(InsertTestRowsDirect(2, 1)); |
| ASSERT_EVENTUALLY([&] { |
| ASSERT_EQ(2, tablet_replica_->tablet()->num_rowsets()); |
| }); |
| |
| // Even though we're under memory pressure, we should see compactions because |
| // there's nothing else to do. |
| FLAGS_enable_rowset_compaction = true; |
| ASSERT_EVENTUALLY([&] { |
| ASSERT_EQ(1, tablet_replica_->tablet()->num_rowsets()); |
| }); |
| } |
| |
| TEST_F(TabletServerTest, TestInsert) { |
| WriteRequestPB req; |
| |
| req.set_tablet_id(kTabletId); |
| |
| WriteResponsePB resp; |
| RpcController controller; |
| |
| scoped_refptr<TabletReplica> tablet; |
| ASSERT_TRUE(mini_server_->server()->tablet_manager()->LookupTablet(kTabletId, &tablet)); |
| scoped_refptr<Counter> rows_inserted = |
| METRIC_rows_inserted.Instantiate(tablet->tablet()->GetMetricEntity()); |
| ASSERT_EQ(0, rows_inserted->value()); |
| tablet.reset(); |
| |
| // Send a bad insert which has an empty schema. This should result |
| // in an error. |
| { |
| AddTestRowToPB(RowOperationsPB::INSERT, schema_, 1234, 5678, "hello world via RPC", |
| req.mutable_row_operations()); |
| |
| SCOPED_TRACE(SecureDebugString(req)); |
| ASSERT_OK(proxy_->Write(req, &resp, &controller)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_TRUE(resp.has_error()); |
| ASSERT_EQ(TabletServerErrorPB::MISMATCHED_SCHEMA, resp.error().code()); |
| Status s = StatusFromPB(resp.error().status()); |
| EXPECT_TRUE(s.IsInvalidArgument()); |
| ASSERT_STR_CONTAINS(s.ToString(), |
| "Client missing required column: key INT32 NOT NULL"); |
| req.clear_row_operations(); |
| } |
| |
| // Send an empty request with the correct schema. |
| // This should succeed and do nothing. |
| { |
| controller.Reset(); |
| ASSERT_OK(SchemaToPB(schema_, req.mutable_schema())); |
| SCOPED_TRACE(SecureDebugString(req)); |
| ASSERT_OK(proxy_->Write(req, &resp, &controller)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_FALSE(resp.has_error()); |
| req.clear_row_operations(); |
| } |
| |
| // Send an actual row insert. |
| { |
| controller.Reset(); |
| RowOperationsPB* data = req.mutable_row_operations(); |
| data->Clear(); |
| |
| AddTestRowToPB(RowOperationsPB::INSERT, schema_, 1234, 5678, |
| "hello world via RPC", data); |
| SCOPED_TRACE(SecureDebugString(req)); |
| ASSERT_OK(proxy_->Write(req, &resp, &controller)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_FALSE(resp.has_error()); |
| req.clear_row_operations(); |
| ASSERT_EQ(1, rows_inserted->value()); |
| } |
| |
| // Send a batch with multiple rows, one of which is a duplicate of |
| // the above insert, and one of which has a too-large value. |
| // This should generate two errors into per_row_errors. |
| { |
| const string kTooLargeValue(100 * 1024, 'x'); |
| controller.Reset(); |
| RowOperationsPB* data = req.mutable_row_operations(); |
| data->Clear(); |
| |
| AddTestRowToPB(RowOperationsPB::INSERT, schema_, 1, 1, "ceci n'est pas une dupe", data); |
| AddTestRowToPB(RowOperationsPB::INSERT, schema_, 2, 1, "also not a dupe key", data); |
| AddTestRowToPB(RowOperationsPB::INSERT, schema_, 1234, 1, "I am a duplicate key", data); |
| AddTestRowToPB(RowOperationsPB::INSERT, schema_, 3, 1, kTooLargeValue, data); |
| SCOPED_TRACE(SecureDebugString(req)); |
| ASSERT_OK(proxy_->Write(req, &resp, &controller)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_FALSE(resp.has_error()) << SecureShortDebugString(resp); |
| ASSERT_EQ(3, rows_inserted->value()); // This counter only counts successful inserts. |
| ASSERT_EQ(2, resp.per_row_errors().size()); |
| |
| // Check the duplicate key error. |
| ASSERT_EQ(2, resp.per_row_errors().Get(0).row_index()); |
| Status s = StatusFromPB(resp.per_row_errors().Get(0).error()); |
| ASSERT_STR_CONTAINS(s.ToString(), "Already present"); |
| |
| // Check the value-too-large error. |
| ASSERT_EQ(3, resp.per_row_errors().Get(1).row_index()); |
| s = StatusFromPB(resp.per_row_errors().Get(1).error()); |
| ASSERT_STR_CONTAINS(s.ToString(), "Invalid argument"); |
| } |
| |
| // get the clock's current timestamp |
| Timestamp now_before = mini_server_->server()->clock()->Now(); |
| |
| rows_inserted = nullptr; |
| NO_FATALS(ShutdownAndRebuildTablet()); |
| VerifyRows(schema_, { KeyValue(1, 1), KeyValue(2, 1), KeyValue(1234, 5678) }); |
| |
| // get the clock's timestamp after replay |
| Timestamp now_after = mini_server_->server()->clock()->Now(); |
| |
| // make sure 'now_after' is greater than or equal to 'now_before' |
| ASSERT_GE(now_after.value(), now_before.value()); |
| } |
| |
| TEST_F(TabletServerTest, TestExternalConsistencyModes_ClientPropagated) { |
| WriteRequestPB req; |
| req.set_tablet_id(kTabletId); |
| WriteResponsePB resp; |
| RpcController controller; |
| |
| scoped_refptr<TabletReplica> tablet; |
| ASSERT_TRUE( |
| mini_server_->server()->tablet_manager()->LookupTablet(kTabletId, |
| &tablet)); |
| scoped_refptr<Counter> rows_inserted = |
| METRIC_rows_inserted.Instantiate(tablet->tablet()->GetMetricEntity()); |
| ASSERT_EQ(0, rows_inserted->value()); |
| |
| // get the current time |
| Timestamp current = mini_server_->server()->clock()->Now(); |
| // advance current to some time in the future. we do 5 secs to make |
| // sure this timestamp will still be in the future when it reaches the |
| // server. |
| current = HybridClock::TimestampFromMicroseconds( |
| HybridClock::GetPhysicalValueMicros(current) + 5000000); |
| |
| // Send an actual row insert. |
| ASSERT_OK(SchemaToPB(schema_, req.mutable_schema())); |
| AddTestRowToPB(RowOperationsPB::INSERT, schema_, 1234, 5678, "hello world via RPC", |
| req.mutable_row_operations()); |
| |
| // set the external consistency mode and the timestamp |
| req.set_external_consistency_mode(CLIENT_PROPAGATED); |
| |
| req.set_propagated_timestamp(current.ToUint64()); |
| SCOPED_TRACE(SecureDebugString(req)); |
| ASSERT_OK(proxy_->Write(req, &resp, &controller)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_FALSE(resp.has_error()); |
| req.clear_row_operations(); |
| ASSERT_EQ(1, rows_inserted->value()); |
| |
| // make sure the server returned a write timestamp where only |
| // the logical value was increased since he should have updated |
| // its clock with the client's value. |
| Timestamp write_timestamp(resp.timestamp()); |
| |
| ASSERT_EQ(HybridClock::GetPhysicalValueMicros(current), |
| HybridClock::GetPhysicalValueMicros(write_timestamp)); |
| |
| ASSERT_EQ(HybridClock::GetLogicalValue(current) + 1, |
| HybridClock::GetLogicalValue(write_timestamp)); |
| } |
| |
| TEST_F(TabletServerTest, TestExternalConsistencyModes_CommitWait) { |
| WriteRequestPB req; |
| req.set_tablet_id(kTabletId); |
| WriteResponsePB resp; |
| RpcController controller; |
| HybridClock* hclock = down_cast<HybridClock*, Clock>(mini_server_->server()->clock()); |
| |
| scoped_refptr<TabletReplica> tablet; |
| ASSERT_TRUE( |
| mini_server_->server()->tablet_manager()->LookupTablet(kTabletId, |
| &tablet)); |
| scoped_refptr<Counter> rows_inserted = |
| METRIC_rows_inserted.Instantiate( |
| tablet->tablet()->GetMetricEntity()); |
| ASSERT_EQ(0, rows_inserted->value()); |
| |
| // get current time, with and without error |
| Timestamp now_before; |
| uint64_t error_before; |
| ASSERT_OK(hclock->NowWithError(&now_before, &error_before)); |
| |
| uint64_t now_before_usec = HybridClock::GetPhysicalValueMicros(now_before); |
| LOG(INFO) << "Submitting write with commit wait at: " << now_before_usec << " us +- " |
| << error_before << " us"; |
| |
| // Send an actual row insert. |
| ASSERT_OK(SchemaToPB(schema_, req.mutable_schema())); |
| AddTestRowToPB(RowOperationsPB::INSERT, schema_, 1234, 5678, "hello world via RPC", |
| req.mutable_row_operations()); |
| |
| // set the external consistency mode to COMMIT_WAIT |
| req.set_external_consistency_mode(COMMIT_WAIT); |
| |
| SCOPED_TRACE(SecureDebugString(req)); |
| ASSERT_OK(proxy_->Write(req, &resp, &controller)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_FALSE(resp.has_error()); |
| req.clear_row_operations(); |
| ASSERT_EQ(1, rows_inserted->value()); |
| |
| // Two things must have happened. |
| // 1 - The write timestamp must be greater than 'now_before' |
| // 2 - The write must have taken at least 'error_before' to complete (two |
| // times more in average). |
| |
| Timestamp now_after; |
| uint64_t error_after; |
| ASSERT_OK(hclock->NowWithError(&now_after, &error_after)); |
| |
| Timestamp write_timestamp(resp.timestamp()); |
| |
| uint64_t write_took = HybridClock::GetPhysicalValueMicros(now_after) - |
| HybridClock::GetPhysicalValueMicros(now_before); |
| |
| LOG(INFO) << "Write applied at: " << HybridClock::GetPhysicalValueMicros(write_timestamp) |
| << " us, current time: " << HybridClock::GetPhysicalValueMicros(now_after) |
| << " us, write took: " << write_took << " us"; |
| |
| ASSERT_GT(write_timestamp.value(), now_before.value()); |
| |
| // see HybridClockTest.TestWaitUntilAfter_TestCase2 |
| if (error_after >= error_before) { |
| ASSERT_GE(write_took, 2 * error_before); |
| } else { |
| ASSERT_GE(write_took, error_before); |
| } |
| } |
| |
| |
| TEST_F(TabletServerTest, TestInsertAndMutate) { |
| |
| scoped_refptr<TabletReplica> tablet; |
| ASSERT_TRUE(mini_server_->server()->tablet_manager()->LookupTablet(kTabletId, &tablet)); |
| scoped_refptr<Counter> rows_inserted = |
| METRIC_rows_inserted.Instantiate(tablet->tablet()->GetMetricEntity()); |
| scoped_refptr<Counter> rows_updated = |
| METRIC_rows_updated.Instantiate(tablet->tablet()->GetMetricEntity()); |
| scoped_refptr<Counter> rows_deleted = |
| METRIC_rows_deleted.Instantiate(tablet->tablet()->GetMetricEntity()); |
| ASSERT_EQ(0, rows_inserted->value()); |
| ASSERT_EQ(0, rows_updated->value()); |
| ASSERT_EQ(0, rows_deleted->value()); |
| tablet.reset(); |
| |
| RpcController controller; |
| |
| { |
| WriteRequestPB req; |
| WriteResponsePB resp; |
| req.set_tablet_id(kTabletId); |
| RowOperationsPB* data = req.mutable_row_operations(); |
| ASSERT_OK(SchemaToPB(schema_, req.mutable_schema())); |
| |
| AddTestRowToPB(RowOperationsPB::INSERT, schema_, 1, 1, "original1", data); |
| AddTestRowToPB(RowOperationsPB::INSERT, schema_, 2, 2, "original2", data); |
| AddTestRowToPB(RowOperationsPB::INSERT, schema_, 3, 3, "original3", data); |
| SCOPED_TRACE(SecureDebugString(req)); |
| ASSERT_OK(proxy_->Write(req, &resp, &controller)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_FALSE(resp.has_error()) << SecureShortDebugString(resp); |
| ASSERT_EQ(0, resp.per_row_errors().size()); |
| ASSERT_EQ(3, rows_inserted->value()); |
| ASSERT_EQ(0, rows_updated->value()); |
| controller.Reset(); |
| } |
| |
| // Try and mutate the rows inserted above |
| { |
| WriteRequestPB req; |
| WriteResponsePB resp; |
| req.set_tablet_id(kTabletId); |
| ASSERT_OK(SchemaToPB(schema_, req.mutable_schema())); |
| |
| AddTestRowToPB(RowOperationsPB::UPDATE, schema_, 1, 2, "mutation1", |
| req.mutable_row_operations()); |
| AddTestRowToPB(RowOperationsPB::UPDATE, schema_, 2, 3, "mutation2", |
| req.mutable_row_operations()); |
| AddTestRowToPB(RowOperationsPB::UPDATE, schema_, 3, 4, "mutation3", |
| req.mutable_row_operations()); |
| SCOPED_TRACE(SecureDebugString(req)); |
| ASSERT_OK(proxy_->Write(req, &resp, &controller)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_FALSE(resp.has_error()) << SecureShortDebugString(resp); |
| ASSERT_EQ(0, resp.per_row_errors().size()); |
| ASSERT_EQ(3, rows_inserted->value()); |
| ASSERT_EQ(3, rows_updated->value()); |
| controller.Reset(); |
| } |
| |
| // Try and mutate a non existent row key (should get an error) |
| { |
| WriteRequestPB req; |
| WriteResponsePB resp; |
| req.set_tablet_id(kTabletId); |
| ASSERT_OK(SchemaToPB(schema_, req.mutable_schema())); |
| |
| AddTestRowToPB(RowOperationsPB::UPDATE, schema_, 1234, 2, "mutated", |
| req.mutable_row_operations()); |
| SCOPED_TRACE(SecureDebugString(req)); |
| ASSERT_OK(proxy_->Write(req, &resp, &controller)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_FALSE(resp.has_error()) << SecureShortDebugString(resp); |
| ASSERT_EQ(1, resp.per_row_errors().size()); |
| ASSERT_EQ(3, rows_updated->value()); |
| controller.Reset(); |
| } |
| |
| // Try and delete 1 row |
| { |
| WriteRequestPB req; |
| WriteResponsePB resp; |
| req.set_tablet_id(kTabletId); |
| ASSERT_OK(SchemaToPB(schema_, req.mutable_schema())); |
| |
| AddTestKeyToPB(RowOperationsPB::DELETE, schema_, 1, req.mutable_row_operations()); |
| SCOPED_TRACE(SecureDebugString(req)); |
| ASSERT_OK(proxy_->Write(req, &resp, &controller)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_FALSE(resp.has_error())<< SecureShortDebugString(resp); |
| ASSERT_EQ(0, resp.per_row_errors().size()); |
| ASSERT_EQ(3, rows_updated->value()); |
| ASSERT_EQ(1, rows_deleted->value()); |
| controller.Reset(); |
| } |
| |
| // Now try and mutate a row we just deleted, we should get an error |
| { |
| WriteRequestPB req; |
| WriteResponsePB resp; |
| req.set_tablet_id(kTabletId); |
| ASSERT_OK(SchemaToPB(schema_, req.mutable_schema())); |
| |
| AddTestRowToPB(RowOperationsPB::UPDATE, schema_, 1, 2, "mutated1", |
| req.mutable_row_operations()); |
| SCOPED_TRACE(SecureDebugString(req)); |
| ASSERT_OK(proxy_->Write(req, &resp, &controller)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_FALSE(resp.has_error())<< SecureShortDebugString(resp); |
| ASSERT_EQ(1, resp.per_row_errors().size()); |
| controller.Reset(); |
| } |
| |
| ASSERT_EQ(3, rows_inserted->value()); |
| ASSERT_EQ(3, rows_updated->value()); |
| |
| // At this point, we have two rows left (row key 2 and 3). |
| VerifyRows(schema_, { KeyValue(2, 3), KeyValue(3, 4) }); |
| |
| // Do a mixed operation (some insert, update, and delete, some of which fail) |
| { |
| const string kTooLargeValue(100 * 1024, 'x'); |
| WriteRequestPB req; |
| WriteResponsePB resp; |
| req.set_tablet_id(kTabletId); |
| ASSERT_OK(SchemaToPB(schema_, req.mutable_schema())); |
| |
| RowOperationsPB* ops = req.mutable_row_operations(); |
| // op 0: Mutate row 1, which doesn't exist. This should fail. |
| AddTestRowToPB(RowOperationsPB::UPDATE, schema_, 1, 3, "mutate_should_fail", ops); |
| // op 1: Insert a new row 4 (succeeds) |
| AddTestRowToPB(RowOperationsPB::INSERT, schema_, 4, 4, "new row 4", ops); |
| // op 2: Delete a non-existent row 5 (should fail) |
| AddTestKeyToPB(RowOperationsPB::DELETE, schema_, 5, ops); |
| // op 3: Insert a new row 6 (succeeds) |
| AddTestRowToPB(RowOperationsPB::INSERT, schema_, 6, 6, "new row 6", ops); |
| // op 4: update a row with a too-large value (fail) |
| AddTestRowToPB(RowOperationsPB::UPDATE, schema_, 4, 6, kTooLargeValue, ops); |
| |
| SCOPED_TRACE(SecureDebugString(req)); |
| ASSERT_OK(proxy_->Write(req, &resp, &controller)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_FALSE(resp.has_error())<< SecureShortDebugString(resp); |
| ASSERT_EQ(3, resp.per_row_errors().size()); |
| EXPECT_EQ("row_index: 0 error { code: NOT_FOUND message: \"key not found\" }", |
| SecureShortDebugString(resp.per_row_errors(0))); |
| EXPECT_EQ("row_index: 2 error { code: NOT_FOUND message: \"key not found\" }", |
| SecureShortDebugString(resp.per_row_errors(1))); |
| EXPECT_EQ("row_index: 4 error { code: INVALID_ARGUMENT message: " |
| "\"value too large for column \\'string_val\\' (102400 bytes, " |
| "maximum is 65536 bytes)\" }", |
| SecureShortDebugString(resp.per_row_errors(2))); |
| controller.Reset(); |
| } |
| |
| // get the clock's current timestamp |
| Timestamp now_before = mini_server_->server()->clock()->Now(); |
| |
| rows_inserted = nullptr; |
| rows_updated = nullptr; |
| NO_FATALS(ShutdownAndRebuildTablet()); |
| VerifyRows(schema_, { KeyValue(2, 3), KeyValue(3, 4), KeyValue(4, 4), KeyValue(6, 6) }); |
| |
| // get the clock's timestamp after replay |
| Timestamp now_after = mini_server_->server()->clock()->Now(); |
| |
| // make sure 'now_after' is greater that or equal to 'now_before' |
| ASSERT_GE(now_after.value(), now_before.value()); |
| } |
| |
| // Try sending write requests that do not contain write operations. Make sure |
| // we get an error that makes sense. |
| TEST_F(TabletServerTest, TestInvalidWriteRequest_WrongOpType) { |
| const vector<RowOperationsPB::Type> wrong_op_types = { |
| RowOperationsPB::SPLIT_ROW, |
| RowOperationsPB::RANGE_LOWER_BOUND, |
| RowOperationsPB::RANGE_UPPER_BOUND, |
| RowOperationsPB::EXCLUSIVE_RANGE_LOWER_BOUND, |
| RowOperationsPB::INCLUSIVE_RANGE_UPPER_BOUND, |
| }; |
| const auto send_bad_write = [&] (RowOperationsPB::Type op_type) { |
| WriteRequestPB req; |
| req.set_tablet_id(kTabletId); |
| WriteResponsePB resp; |
| RpcController controller; |
| |
| CHECK_OK(SchemaToPB(schema_, req.mutable_schema())); |
| RowOperationsPB* data = req.mutable_row_operations(); |
| AddTestRowToPB(op_type, schema_, 1234, 5678, "foo", data); |
| SCOPED_TRACE(SecureDebugString(req)); |
| CHECK_OK(proxy_->Write(req, &resp, &controller)); |
| return resp; |
| }; |
| // Send a bunch of op types that are inappropriate for write requests. |
| for (const auto& op_type : wrong_op_types) { |
| WriteResponsePB resp = send_bad_write(op_type); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_TRUE(resp.has_error()); |
| ASSERT_EQ(TabletServerErrorPB::MISMATCHED_SCHEMA, resp.error().code()); |
| ASSERT_EQ(AppStatusPB::INVALID_ARGUMENT, resp.error().status().code()); |
| ASSERT_STR_CONTAINS(resp.error().status().message(), |
| "Invalid write operation type"); |
| } |
| { |
| // Do the same for UNKNOWN, which is an unexpected operation type in all |
| // cases, and thus results in a different error message. |
| WriteResponsePB resp = send_bad_write(RowOperationsPB::UNKNOWN); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_TRUE(resp.has_error()); |
| ASSERT_EQ(TabletServerErrorPB::MISMATCHED_SCHEMA, resp.error().code()); |
| ASSERT_EQ(AppStatusPB::NOT_SUPPORTED, resp.error().status().code()); |
| ASSERT_STR_CONTAINS(resp.error().status().message(), |
| "Unknown row operation type"); |
| } |
| } |
| |
| // Test that passing a schema with fields not present in the tablet schema |
| // throws an exception. |
| TEST_F(TabletServerTest, TestInvalidWriteRequest_BadSchema) { |
| SchemaBuilder schema_builder(schema_); |
| ASSERT_OK(schema_builder.AddColumn("col_doesnt_exist", INT32)); |
| Schema bad_schema_with_ids = schema_builder.Build(); |
| Schema bad_schema = schema_builder.BuildWithoutIds(); |
| |
| // Send a row insert with an extra column |
| { |
| WriteRequestPB req; |
| WriteResponsePB resp; |
| RpcController controller; |
| |
| req.set_tablet_id(kTabletId); |
| RowOperationsPB* data = req.mutable_row_operations(); |
| ASSERT_OK(SchemaToPB(bad_schema, req.mutable_schema())); |
| |
| KuduPartialRow row(&bad_schema); |
| CHECK_OK(row.SetInt32("key", 1234)); |
| CHECK_OK(row.SetInt32("int_val", 5678)); |
| CHECK_OK(row.SetStringCopy("string_val", "hello world via RPC")); |
| CHECK_OK(row.SetInt32("col_doesnt_exist", 91011)); |
| RowOperationsPBEncoder enc(data); |
| enc.Add(RowOperationsPB::INSERT, row); |
| |
| SCOPED_TRACE(SecureDebugString(req)); |
| ASSERT_OK(proxy_->Write(req, &resp, &controller)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_TRUE(resp.has_error()); |
| ASSERT_EQ(TabletServerErrorPB::MISMATCHED_SCHEMA, resp.error().code()); |
| ASSERT_STR_CONTAINS(resp.error().status().message(), |
| "Client provided column col_doesnt_exist INT32 NOT NULL" |
| " not present in tablet"); |
| } |
| |
| // Send a row mutation with an extra column and IDs |
| { |
| WriteRequestPB req; |
| WriteResponsePB resp; |
| RpcController controller; |
| |
| req.set_tablet_id(kTabletId); |
| ASSERT_OK(SchemaToPB(bad_schema_with_ids, req.mutable_schema())); |
| |
| AddTestKeyToPB(RowOperationsPB::UPDATE, bad_schema_with_ids, 1, |
| req.mutable_row_operations()); |
| SCOPED_TRACE(SecureDebugString(req)); |
| ASSERT_OK(proxy_->Write(req, &resp, &controller)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_TRUE(resp.has_error()); |
| ASSERT_EQ(TabletServerErrorPB::INVALID_SCHEMA, resp.error().code()); |
| ASSERT_STR_CONTAINS(resp.error().status().message(), |
| "User requests should not have Column IDs"); |
| } |
| } |
| |
| // Executes mutations each time a Tablet goes through a compaction/flush |
| // lifecycle hook. This allows to create mutations of all possible types |
| // deterministically. The purpose is to make sure such mutations are replayed |
| // correctly on tablet bootstrap. |
| class MyCommonHooks : public Tablet::FlushCompactCommonHooks, |
| public Tablet::FlushFaultHooks, |
| public Tablet::CompactionFaultHooks { |
| public: |
| explicit MyCommonHooks(TabletServerTest* test) |
| : test_(test), |
| iteration_(0) {} |
| |
| Status DoHook(int32_t key, int32_t new_int_val) { |
| test_->UpdateTestRowRemote(key, new_int_val); |
| return Status::OK(); |
| } |
| |
| // This should go in pre-flush and get flushed |
| virtual Status PostSwapNewMemRowSet() OVERRIDE { |
| return DoHook(1, 10 + iteration_); |
| } |
| // This should go in after the flush, but before |
| // the duplicating row set, i.e., this should appear as |
| // a missed delta. |
| virtual Status PostTakeMvccSnapshot() OVERRIDE { |
| return DoHook(2, 20 + iteration_); |
| } |
| // This too should appear as a missed delta. |
| virtual Status PostWriteSnapshot() OVERRIDE { |
| return DoHook(3, 30 + iteration_); |
| } |
| // This should appear as a duplicated mutation |
| virtual Status PostSwapInDuplicatingRowSet() OVERRIDE { |
| return DoHook(4, 40 + iteration_); |
| } |
| // This too should appear as a duplicated mutation |
| virtual Status PostReupdateMissedDeltas() OVERRIDE { |
| return DoHook(5, 50 + iteration_); |
| } |
| // This should go into the new delta. |
| virtual Status PostSwapNewRowSet() OVERRIDE { |
| return DoHook(6, 60 + iteration_); |
| } |
| // This should go in pre-flush (only on compactions) |
| virtual Status PostSelectIterators() OVERRIDE { |
| return DoHook(7, 70 + iteration_); |
| } |
| void increment_iteration() { |
| iteration_++; |
| } |
| protected: |
| TabletServerTest* test_; |
| int iteration_; |
| }; |
| |
| // Tests performing mutations that are going to the initial MRS |
| // or to a DMS, when the MRS is flushed. This also tests that the |
| // log produced on recovery allows to re-recover the original state. |
| TEST_F(TabletServerTest, TestRecoveryWithMutationsWhileFlushing) { |
| |
| InsertTestRowsRemote(1, 7); |
| |
| auto hooks(make_shared<MyCommonHooks>(this)); |
| |
| tablet_replica_->tablet()->SetFlushHooksForTests(hooks); |
| tablet_replica_->tablet()->SetCompactionHooksForTests(hooks); |
| tablet_replica_->tablet()->SetFlushCompactCommonHooksForTests(hooks); |
| |
| ASSERT_OK(tablet_replica_->tablet()->Flush()); |
| |
| // Shutdown the tserver and try and rebuild the tablet from the log |
| // produced on recovery (recovery flushed no state, but produced a new |
| // log). |
| NO_FATALS(ShutdownAndRebuildTablet()); |
| VerifyRows(schema_, { KeyValue(1, 10), |
| KeyValue(2, 20), |
| KeyValue(3, 30), |
| KeyValue(4, 40), |
| KeyValue(5, 50), |
| KeyValue(6, 60), |
| // the last hook only fires on compaction |
| // so this isn't mutated |
| KeyValue(7, 7) }); |
| |
| // Shutdown and rebuild again to test that the log generated during |
| // the previous recovery allows to perform recovery again. |
| NO_FATALS(ShutdownAndRebuildTablet()); |
| VerifyRows(schema_, { KeyValue(1, 10), |
| KeyValue(2, 20), |
| KeyValue(3, 30), |
| KeyValue(4, 40), |
| KeyValue(5, 50), |
| KeyValue(6, 60), |
| KeyValue(7, 7) }); |
| } |
| |
| // Tests performing mutations that are going to a DMS or to the following |
| // DMS, when the initial one is flushed. |
| TEST_F(TabletServerTest, TestRecoveryWithMutationsWhileFlushingAndCompacting) { |
| |
| InsertTestRowsRemote(1, 7); |
| |
| auto hooks(make_shared<MyCommonHooks>(this)); |
| |
| tablet_replica_->tablet()->SetFlushHooksForTests(hooks); |
| tablet_replica_->tablet()->SetCompactionHooksForTests(hooks); |
| tablet_replica_->tablet()->SetFlushCompactCommonHooksForTests(hooks); |
| |
| // flush the first time |
| ASSERT_OK(tablet_replica_->tablet()->Flush()); |
| |
| NO_FATALS(ShutdownAndRebuildTablet()); |
| VerifyRows(schema_, { KeyValue(1, 10), |
| KeyValue(2, 20), |
| KeyValue(3, 30), |
| KeyValue(4, 40), |
| KeyValue(5, 50), |
| KeyValue(6, 60), |
| KeyValue(7, 7) }); |
| hooks->increment_iteration(); |
| |
| // set the hooks on the new tablet |
| tablet_replica_->tablet()->SetFlushHooksForTests(hooks); |
| tablet_replica_->tablet()->SetCompactionHooksForTests(hooks); |
| tablet_replica_->tablet()->SetFlushCompactCommonHooksForTests(hooks); |
| |
| // insert an additional row so that we can flush |
| InsertTestRowsRemote(8, 1); |
| |
| // flush an additional MRS so that we have two DiskRowSets and then compact |
| // them making sure that mutations executed mid compaction are replayed as |
| // expected |
| ASSERT_OK(tablet_replica_->tablet()->Flush()); |
| VerifyRows(schema_, { KeyValue(1, 11), |
| KeyValue(2, 21), |
| KeyValue(3, 31), |
| KeyValue(4, 41), |
| KeyValue(5, 51), |
| KeyValue(6, 61), |
| KeyValue(7, 7), |
| KeyValue(8, 8) }); |
| |
| hooks->increment_iteration(); |
| ASSERT_OK(tablet_replica_->tablet()->Compact(Tablet::FORCE_COMPACT_ALL)); |
| |
| // get the clock's current timestamp |
| Timestamp now_before = mini_server_->server()->clock()->Now(); |
| |
| // Shutdown the tserver and try and rebuild the tablet from the log |
| // produced on recovery (recovery flushed no state, but produced a new |
| // log). |
| NO_FATALS(ShutdownAndRebuildTablet()); |
| VerifyRows(schema_, { KeyValue(1, 11), |
| KeyValue(2, 22), |
| KeyValue(3, 32), |
| KeyValue(4, 42), |
| KeyValue(5, 52), |
| KeyValue(6, 62), |
| KeyValue(7, 72), |
| KeyValue(8, 8) }); |
| |
| // get the clock's timestamp after replay |
| Timestamp now_after = mini_server_->server()->clock()->Now(); |
| |
| // make sure 'now_after' is greater than or equal to 'now_before' |
| ASSERT_GE(now_after.value(), now_before.value()); |
| } |
| |
| #define ANFF NO_FATALS |
| |
| // Regression test for KUDU-176. Ensures that after a major delta compaction, |
| // restarting properly recovers the tablet. |
| TEST_F(TabletServerTest, TestKUDU_176_RecoveryAfterMajorDeltaCompaction) { |
| |
| // Flush a DRS with 1 rows. |
| NO_FATALS(InsertTestRowsRemote(1, 1)); |
| ASSERT_OK(tablet_replica_->tablet()->Flush()); |
| ANFF(VerifyRows(schema_, { KeyValue(1, 1) })); |
| |
| // Update it, flush deltas. |
| ANFF(UpdateTestRowRemote(1, 2)); |
| ASSERT_OK(tablet_replica_->tablet()->FlushBiggestDMS()); |
| ANFF(VerifyRows(schema_, { KeyValue(1, 2) })); |
| |
| // Major compact deltas. |
| { |
| vector<shared_ptr<tablet::RowSet> > rsets; |
| tablet_replica_->tablet()->GetRowSetsForTests(&rsets); |
| vector<ColumnId> col_ids = { tablet_replica_->tablet()->schema()->column_id(1), |
| tablet_replica_->tablet()->schema()->column_id(2) }; |
| ASSERT_OK(tablet_replica_->tablet()->DoMajorDeltaCompaction(col_ids, rsets[0])); |
| } |
| |
| // Verify that data is still the same. |
| ANFF(VerifyRows(schema_, { KeyValue(1, 2) })); |
| |
| // Verify that data remains after a restart. |
| ASSERT_OK(ShutdownAndRebuildTablet()); |
| ANFF(VerifyRows(schema_, { KeyValue(1, 2) })); |
| } |
| |
| // Regression test for KUDU-1341, a case in which, during bootstrap, |
| // we have a DELETE for a row which is still live in multiple on-disk |
| // rowsets. |
| TEST_F(TabletServerTest, TestKUDU_1341) { |
| for (int i = 0; i < 3; i++) { |
| // Insert a row to DMS and flush it. |
| ANFF(InsertTestRowsRemote(1, 1)); |
| ASSERT_OK(tablet_replica_->tablet()->Flush()); |
| |
| // Update and delete row (in DMS) |
| ANFF(UpdateTestRowRemote(1, i)); |
| ANFF(DeleteTestRowsRemote(1, 1)); |
| } |
| |
| // Insert row again, update it in MRS before flush, and |
| // flush. |
| ANFF(InsertTestRowsRemote(1, 1)); |
| ANFF(UpdateTestRowRemote(1, 12345)); |
| ASSERT_OK(tablet_replica_->tablet()->Flush()); |
| |
| ANFF(VerifyRows(schema_, { KeyValue(1, 12345) })); |
| |
| // Test restart. |
| ASSERT_OK(ShutdownAndRebuildTablet()); |
| ANFF(VerifyRows(schema_, { KeyValue(1, 12345) })); |
| ASSERT_OK(tablet_replica_->tablet()->Flush()); |
| ANFF(VerifyRows(schema_, { KeyValue(1, 12345) })); |
| |
| // Test compaction after restart. |
| ASSERT_OK(tablet_replica_->tablet()->Compact(Tablet::FORCE_COMPACT_ALL)); |
| ANFF(VerifyRows(schema_, { KeyValue(1, 12345) })); |
| } |
| |
| TEST_F(TabletServerTest, TestExactlyOnceForErrorsAcrossRestart) { |
| WriteRequestPB req; |
| WriteResponsePB resp; |
| RpcController rpc; |
| |
| // Set up a request to insert two rows. |
| req.set_tablet_id(kTabletId); |
| AddTestRowToPB(RowOperationsPB::INSERT, schema_, 1234, 5678, "hello world via RPC", |
| req.mutable_row_operations()); |
| AddTestRowToPB(RowOperationsPB::INSERT, schema_, 12345, 5679, "hello world via RPC2", |
| req.mutable_row_operations()); |
| ASSERT_OK(SchemaToPB(schema_, req.mutable_schema())); |
| |
| // Insert it, assuming no errors. |
| { |
| SCOPED_TRACE(req.DebugString()); |
| ASSERT_OK(proxy_->Write(req, &resp, &rpc)); |
| SCOPED_TRACE(resp.DebugString()); |
| ASSERT_FALSE(resp.has_error()); |
| ASSERT_EQ(0, resp.per_row_errors_size()); |
| } |
| |
| // Set up a RequestID to use in the later requests. |
| rpc::RequestIdPB req_id; |
| req_id.set_client_id("client-id"); |
| req_id.set_seq_no(1); |
| req_id.set_first_incomplete_seq_no(1); |
| req_id.set_attempt_no(1); |
| |
| // Insert the row again, with the request ID specified. We should expect an |
| // "ALREADY_PRESENT" error. |
| { |
| rpc.Reset(); |
| rpc.SetRequestIdPB(unique_ptr<rpc::RequestIdPB>(new rpc::RequestIdPB(req_id))); |
| ASSERT_OK(proxy_->Write(req, &resp, &rpc)); |
| SCOPED_TRACE(resp.DebugString()); |
| ASSERT_FALSE(resp.has_error()); |
| ASSERT_EQ(2, resp.per_row_errors_size()); |
| } |
| |
| // Restart the tablet server several times, and after each restart, send a new attempt of the |
| // same request. We make the request itself invalid by clearing the schema and ops, but |
| // that shouldn't matter since it's just hitting the ResultTracker and returning the |
| // cached response. If the ResultTracker didn't have a cached response, then we'd get an |
| // error about an invalid request. |
| req.clear_schema(); |
| req.clear_row_operations(); |
| for (int i = 1; i <= 5; i++) { |
| SCOPED_TRACE(Substitute("restart attempt #$0", i)); |
| NO_FATALS(ShutdownAndRebuildTablet()); |
| rpc.Reset(); |
| req_id.set_attempt_no(req_id.attempt_no() + 1); |
| rpc.SetRequestIdPB(unique_ptr<rpc::RequestIdPB>(new rpc::RequestIdPB(req_id))); |
| ASSERT_OK(proxy_->Write(req, &resp, &rpc)); |
| SCOPED_TRACE(resp.DebugString()); |
| ASSERT_FALSE(resp.has_error()); |
| ASSERT_EQ(2, resp.per_row_errors_size()); |
| } |
| } |
| |
| // Regression test for KUDU-177. Ensures that after a major delta compaction, |
| // rows that were in the old DRS's DMS are properly replayed. |
| TEST_F(TabletServerTest, TestKUDU_177_RecoveryOfDMSEditsAfterMajorDeltaCompaction) { |
| // Flush a DRS with 1 rows. |
| ANFF(InsertTestRowsRemote(1, 1)); |
| ASSERT_OK(tablet_replica_->tablet()->Flush()); |
| ANFF(VerifyRows(schema_, { KeyValue(1, 1) })); |
| |
| // Update it, flush deltas. |
| ANFF(UpdateTestRowRemote(1, 2)); |
| ASSERT_OK(tablet_replica_->tablet()->FlushBiggestDMS()); |
| |
| // Update it again, so this last update is in the DMS. |
| ANFF(UpdateTestRowRemote(1, 3)); |
| ANFF(VerifyRows(schema_, { KeyValue(1, 3) })); |
| |
| // Major compact deltas. This doesn't include the DMS, but the old |
| // DMS should "move over" to the output of the delta compaction. |
| { |
| vector<shared_ptr<tablet::RowSet> > rsets; |
| tablet_replica_->tablet()->GetRowSetsForTests(&rsets); |
| vector<ColumnId> col_ids = { tablet_replica_->tablet()->schema()->column_id(1), |
| tablet_replica_->tablet()->schema()->column_id(2) }; |
| ASSERT_OK(tablet_replica_->tablet()->DoMajorDeltaCompaction(col_ids, rsets[0])); |
| } |
| // Verify that data is still the same. |
| ANFF(VerifyRows(schema_, { KeyValue(1, 3) })); |
| |
| // Verify that the update remains after a restart. |
| ASSERT_OK(ShutdownAndRebuildTablet()); |
| ANFF(VerifyRows(schema_, { KeyValue(1, 3) })); |
| } |
| |
| TEST_F(TabletServerTest, TestClientGetsErrorBackWhenRecoveryFailed) { |
| ANFF(InsertTestRowsRemote(1, 7)); |
| |
| ASSERT_OK(tablet_replica_->tablet()->Flush()); |
| |
| // Save the log path before shutting down the tablet (and destroying |
| // the TabletReplica). |
| string log_path = tablet_replica_->log()->ActiveSegmentPathForTests(); |
| ShutdownTablet(); |
| |
| ASSERT_OK(log::CorruptLogFile(env_, log_path, log::FLIP_BYTE, 300)); |
| |
| ASSERT_FALSE(ShutdownAndRebuildTablet().ok()); |
| |
| // Connect to it. |
| CreateTsClientProxies(mini_server_->bound_rpc_addr(), |
| client_messenger_, |
| &tablet_copy_proxy_, &proxy_, &admin_proxy_, &consensus_proxy_, |
| &generic_proxy_); |
| |
| WriteRequestPB req; |
| req.set_tablet_id(kTabletId); |
| |
| WriteResponsePB resp; |
| rpc::RpcController controller; |
| |
| // We're expecting the write to fail. |
| ASSERT_OK(DCHECK_NOTNULL(proxy_.get())->Write(req, &resp, &controller)); |
| ASSERT_EQ(TabletServerErrorPB::TABLET_FAILED, resp.error().code()); |
| ASSERT_STR_CONTAINS(resp.error().status().message(), "Tablet not RUNNING: FAILED"); |
| |
| // Check that the TabletReplica's status message is updated with the failure. |
| ASSERT_STR_CONTAINS(tablet_replica_->last_status(), |
| "Log file corruption detected"); |
| } |
| |
| TEST_F(TabletServerTest, TestReadLatest) { |
| int num_rows = AllowSlowTests() ? 10000 : 1000; |
| InsertTestRowsDirect(0, num_rows); |
| |
| // Instantiate scanner metrics. |
| ASSERT_TRUE(mini_server_->server()->metric_entity()); |
| // We don't care what the function is, since the metric is already instantiated. |
| auto active_scanners = METRIC_active_scanners.InstantiateFunctionGauge( |
| mini_server_->server()->metric_entity(), []() {return 0; }); |
| scoped_refptr<TabletReplica> tablet; |
| ASSERT_TRUE(mini_server_->server()->tablet_manager()->LookupTablet(kTabletId, &tablet)); |
| ASSERT_TRUE(tablet->tablet()->GetMetricEntity()); |
| scoped_refptr<AtomicGauge<size_t>> tablet_active_scanners = |
| METRIC_tablet_active_scanners.Instantiate(tablet->tablet()->GetMetricEntity(), 0); |
| |
| ScanResponsePB resp; |
| NO_FATALS(OpenScannerWithAllColumns(&resp)); |
| |
| // Ensure that the scanner ID came back and got inserted into the |
| // ScannerManager map. |
| string scanner_id = resp.scanner_id(); |
| ASSERT_TRUE(!scanner_id.empty()); |
| { |
| SharedScanner junk; |
| TabletServerErrorPB::Code error_code; |
| ASSERT_OK(mini_server_->server()->scanner_manager()->LookupScanner( |
| scanner_id, proxy_->user_credentials().real_user(), &error_code, &junk)); |
| } |
| |
| // Ensure that the scanner shows up in the server and tablet's metrics. |
| ASSERT_EQ(1, active_scanners->value()); |
| ASSERT_EQ(1, tablet_active_scanners->value()); |
| |
| // Drain all the rows from the scanner. |
| vector<string> results; |
| NO_FATALS(DrainScannerToStrings(resp.scanner_id(), schema_, &results)); |
| ASSERT_EQ(num_rows, results.size()); |
| |
| KuduPartialRow row(&schema_); |
| for (int i = 0; i < num_rows; i++) { |
| BuildTestRow(i, &row); |
| string expected = "(" + row.ToString() + ")"; |
| ASSERT_EQ(expected, results[i]); |
| } |
| |
| // Since the rows are drained, the scanner should be automatically removed |
| // from the scanner manager. |
| { |
| SharedScanner junk; |
| TabletServerErrorPB::Code error_code; |
| ASSERT_TRUE(mini_server_->server()->scanner_manager()->LookupScanner( |
| scanner_id, proxy_->user_credentials().real_user(), &error_code, &junk).IsNotFound()); |
| ASSERT_EQ(TabletServerErrorPB::SCANNER_EXPIRED, error_code); |
| } |
| |
| // Ensure that the metrics have been updated now that the scanner is unregistered. |
| ASSERT_EQ(0, active_scanners->value()); |
| ASSERT_EQ(0, tablet_active_scanners->value()); |
| } |
| |
| class ExpiredScannerParamTest : |
| public TabletServerTest, |
| public ::testing::WithParamInterface<ReadMode> { |
| }; |
| |
| TEST_P(ExpiredScannerParamTest, Test) { |
| const ReadMode mode = GetParam(); |
| |
| // Make scanners expire quickly. |
| FLAGS_scanner_ttl_ms = 1; |
| |
| int num_rows = 100; |
| InsertTestRowsDirect(0, num_rows); |
| |
| // Instantiate scanners expired metric. |
| ASSERT_TRUE(mini_server_->server()->metric_entity()); |
| scoped_refptr<Counter> scanners_expired = METRIC_scanners_expired.Instantiate( |
| mini_server_->server()->metric_entity()); |
| |
| // Initially, there've been no scanners, so none of have expired. |
| ASSERT_EQ(0, scanners_expired->value()); |
| |
| // Capture the glog output so we can ensure the scanner expiration message |
| // gets logged. |
| StringVectorSink capture_logs; |
| ScopedRegisterSink reg(&capture_logs); |
| |
| // Open a scanner but don't read from it. |
| ScanResponsePB resp; |
| NO_FATALS(OpenScannerWithAllColumns(&resp, mode)); |
| |
| // The scanner should expire after a short time. |
| ASSERT_EVENTUALLY([&]() { |
| ASSERT_EQ(1, scanners_expired->value()); |
| }); |
| |
| // Continue the scan. We should get a SCANNER_EXPIRED error. |
| ScanRequestPB req; |
| RpcController rpc; |
| req.set_scanner_id(resp.scanner_id()); |
| req.set_call_seq_id(1); |
| resp.Clear(); |
| ASSERT_OK(proxy_->Scan(req, &resp, &rpc)); |
| ASSERT_TRUE(resp.has_error()); |
| ASSERT_EQ(TabletServerErrorPB::SCANNER_EXPIRED, resp.error().code()); |
| ASSERT_STR_MATCHES(resp.error().status().message(), "Scanner .* not found"); |
| |
| ASSERT_STRINGS_ANY_MATCH(capture_logs.logged_msgs(), "Scan: .* Scanner .* not found .* remote="); |
| } |
| |
| static const ReadMode kReadModes[] = { |
| READ_LATEST, |
| READ_AT_SNAPSHOT, |
| READ_YOUR_WRITES, |
| }; |
| |
| INSTANTIATE_TEST_SUITE_P(Params, ExpiredScannerParamTest, |
| testing::ValuesIn(kReadModes)); |
| |
| class ScanCorruptedDeltasParamTest : |
| public TabletServerTest, |
| public ::testing::WithParamInterface<ReadMode> { |
| }; |
| |
| TEST_P(ScanCorruptedDeltasParamTest, Test) { |
| const ReadMode mode = GetParam(); |
| // Ensure some rows get to disk with deltas. |
| InsertTestRowsDirect(0, 100); |
| ASSERT_OK(tablet_replica_->tablet()->Flush()); |
| UpdateTestRowRemote(1, 100); |
| ASSERT_OK(tablet_replica_->tablet()->Flush()); |
| |
| // Fudge with some delta blocks. |
| TabletSuperBlockPB superblock_pb; |
| tablet_replica_->tablet()->metadata()->ToSuperBlock(&superblock_pb); |
| FsManager* fs_manager = mini_server_->server()->fs_manager(); |
| for (int rowset_no = 0; rowset_no < superblock_pb.rowsets_size(); rowset_no++) { |
| RowSetDataPB* rowset_pb = superblock_pb.mutable_rowsets(rowset_no); |
| for (int id = 0; id < rowset_pb->undo_deltas_size(); id++) { |
| BlockId block_id(rowset_pb->undo_deltas(id).block().id()); |
| BlockId new_block_id; |
| // Make a copy of each block and rewrite the superblock to include these |
| // newly corrupted blocks. |
| ASSERT_OK(CreateCorruptBlock(fs_manager, block_id, 0, 0, &new_block_id)); |
| rowset_pb->mutable_undo_deltas(id)->mutable_block()->set_id(new_block_id.id()); |
| } |
| } |
| // Grab the deltafiles and corrupt them. |
| const string& meta_path = fs_manager->GetTabletMetadataPath(tablet_replica_->tablet_id()); |
| ShutdownTablet(); |
| |
| // Flush the corruption and rebuild the server with the corrupt data. |
| ASSERT_OK(pb_util::WritePBContainerToPath(env_, |
| meta_path, superblock_pb, pb_util::OVERWRITE, pb_util::SYNC, pb_util::SENSITIVE)); |
| ASSERT_OK(ShutdownAndRebuildTablet()); |
| LOG(INFO) << Substitute("Rebuilt tablet $0 with broken blocks", tablet_replica_->tablet_id()); |
| |
| // Now open a scanner for the server. |
| ScanRequestPB req; |
| ScanResponsePB resp; |
| RpcController rpc; |
| NewScanRequestPB* scan = req.mutable_new_scan_request(); |
| scan->set_tablet_id(kTabletId); |
| scan->set_read_mode(mode); |
| ASSERT_OK(SchemaToColumnPBs(schema_, scan->mutable_projected_columns())); |
| |
| // Send the call. This first call should attempt to init the corrupted |
| // deltafiles and return with an error. Subsequent calls should see that the |
| // previous call to init failed and should return an appropriate error. |
| // |
| // It's possible for snapshot scans to be waiting in MVCC when the tablet |
| // fails. If that happens, the error will be slightly different. |
| { |
| req.set_batch_size_bytes(10000); |
| SCOPED_TRACE(SecureDebugString(req)); |
| ASSERT_OK(proxy_->Scan(req, &resp, &rpc)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_TRUE(resp.has_error()); |
| const auto& s = resp.error().status(); |
| if (s.code() == AppStatusPB::CORRUPTION) { |
| ASSERT_STR_CONTAINS(s.message(), "failed to init CFileReader"); |
| } else if (s.code() == AppStatusPB::ABORTED) { |
| ASSERT_STR_CONTAINS(s.message(), "MVCC is closed"); |
| } else { |
| FAIL() << "Unexpected failure"; |
| } |
| } |
| |
| // The tablet will end up transitioning to a failed state and yield "not |
| // running" errors. |
| for (int i = 0; i < 2; i++) { |
| rpc.Reset(); |
| ASSERT_OK(proxy_->Scan(req, &resp, &rpc)); |
| SCOPED_TRACE(SecureDebugString(req)); |
| ASSERT_TRUE(resp.has_error()); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| const auto& s = resp.error().status(); |
| if (s.code() == AppStatusPB::ILLEGAL_STATE) { |
| ASSERT_STR_CONTAINS(s.message(), "Tablet not RUNNING"); |
| } else if (s.code() == AppStatusPB::ABORTED) { |
| ASSERT_STR_CONTAINS(s.message(), "MVCC is closed"); |
| } else { |
| FAIL() << "Unexpected failure"; |
| } |
| } |
| } |
| |
| INSTANTIATE_TEST_SUITE_P(Params, ScanCorruptedDeltasParamTest, |
| testing::ValuesIn(kReadModes)); |
| |
| class ScannerOpenWhenServerShutsDownParamTest : |
| public TabletServerTest, |
| public ::testing::WithParamInterface<ReadMode> { |
| }; |
| TEST_P(ScannerOpenWhenServerShutsDownParamTest, Test) { |
| const ReadMode mode = GetParam(); |
| // Write and flush the write, so we have some rows in MRS and DRS |
| InsertTestRowsDirect(0, 100); |
| ASSERT_OK(tablet_replica_->tablet()->Flush()); |
| UpdateTestRowRemote(1, 100); |
| ASSERT_OK(tablet_replica_->tablet()->Flush()); |
| |
| ScanResponsePB resp; |
| NO_FATALS(OpenScannerWithAllColumns(&resp, mode)); |
| |
| // Scanner is now open. The test will now shut down the TS with the scanner still |
| // out there. Due to KUDU-161 this used to fail, since the scanner (and thus the MRS) |
| // stayed open longer than the anchor registry |
| } |
| |
| INSTANTIATE_TEST_SUITE_P(Params, ScannerOpenWhenServerShutsDownParamTest, |
| testing::ValuesIn(kReadModes)); |
| |
| TEST_F(TabletServerTest, TestSnapshotScan) { |
| const int num_rows = AllowSlowTests() ? 1000 : 100; |
| const int num_batches = AllowSlowTests() ? 100 : 10; |
| vector<uint64_t> write_timestamps_collector; |
| |
| // perform a series of writes and collect the timestamps |
| InsertTestRowsRemote(0, num_rows, num_batches, nullptr, |
| kTabletId, &write_timestamps_collector); |
| |
| // now perform snapshot scans. |
| ScanRequestPB req; |
| ScanResponsePB resp; |
| RpcController rpc; |
| |
| int batch_idx = 1; |
| for (uint64_t write_timestamp : write_timestamps_collector) { |
| req.Clear(); |
| resp.Clear(); |
| rpc.Reset(); |
| // Set up a new request with no predicates, all columns. |
| const Schema& projection = schema_; |
| NewScanRequestPB* scan = req.mutable_new_scan_request(); |
| scan->set_tablet_id(kTabletId); |
| scan->set_read_mode(READ_AT_SNAPSHOT); |
| |
| // Decode and re-encode the timestamp. Note that a snapshot at 'write_timestamp' |
| // does not include the written rows, so we increment that timestamp by one |
| // to make sure we get those rows back |
| Timestamp read_timestamp(write_timestamp); |
| read_timestamp = Timestamp(read_timestamp.value() + 1); |
| scan->set_snap_timestamp(read_timestamp.ToUint64()); |
| |
| ASSERT_OK(SchemaToColumnPBs(projection, scan->mutable_projected_columns())); |
| req.set_call_seq_id(0); |
| |
| const Timestamp pre_scan_ts = mini_server_->server()->clock()->Now(); |
| // Send the call |
| { |
| SCOPED_TRACE(SecureDebugString(req)); |
| req.set_batch_size_bytes(0); // so it won't return data right away |
| ASSERT_OK(proxy_->Scan(req, &resp, &rpc)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_FALSE(resp.has_error()); |
| } |
| |
| // The 'propagated_timestamp' field must be set for 'success' responses. |
| ASSERT_TRUE(resp.has_propagated_timestamp()); |
| ASSERT_GT(mini_server_->server()->clock()->Now().ToUint64(), |
| resp.propagated_timestamp()); |
| ASSERT_LT(pre_scan_ts.ToUint64(), resp.propagated_timestamp()); |
| |
| ASSERT_TRUE(resp.has_more_results()); |
| // Drain all the rows from the scanner. |
| vector<string> results; |
| NO_FATALS(DrainScannerToStrings(resp.scanner_id(), schema_, &results)); |
| // on each scan we should get (num_rows / num_batches) * batch_idx rows back |
| int expected_num_rows = (num_rows / num_batches) * batch_idx; |
| ASSERT_EQ(expected_num_rows, results.size()); |
| |
| if (VLOG_IS_ON(2)) { |
| VLOG(2) << Substitute("Scanner: $0 performing a snapshot read at $1 got back: ", |
| resp.scanner_id(), read_timestamp.ToString()); |
| for (const string& result : results) { |
| VLOG(2) << result; |
| } |
| } |
| |
| // assert that the first and last rows were the expected ones |
| ASSERT_EQ(R"((int32 key=0, int32 int_val=0, string string_val="original0"))", results[0]); |
| ASSERT_EQ(Substitute(R"((int32 key=$0, int32 int_val=$0, string string_val="original$0"))", |
| (batch_idx * (num_rows / num_batches) - 1)), results[results.size() - 1]); |
| batch_idx++; |
| } |
| } |
| |
| TEST_F(TabletServerTest, TestSnapshotScan_WithoutSnapshotTimestamp) { |
| vector<uint64_t> write_timestamps_collector; |
| // perform a write |
| InsertTestRowsRemote(0, 1, 1, nullptr, kTabletId, &write_timestamps_collector); |
| |
| ScanRequestPB req; |
| ScanResponsePB resp; |
| RpcController rpc; |
| |
| // Set up a new request with no predicates, all columns. |
| const Schema& projection = schema_; |
| NewScanRequestPB* scan = req.mutable_new_scan_request(); |
| scan->set_tablet_id(kTabletId); |
| ASSERT_OK(SchemaToColumnPBs(projection, scan->mutable_projected_columns())); |
| req.set_call_seq_id(0); |
| req.set_batch_size_bytes(0); // so it won't return data right away |
| scan->set_read_mode(READ_AT_SNAPSHOT); |
| |
| const Timestamp pre_scan_ts = mini_server_->server()->clock()->Now(); |
| |
| // Send the call |
| { |
| SCOPED_TRACE(SecureDebugString(req)); |
| req.set_batch_size_bytes(0); // so it won't return data right away |
| ASSERT_OK(proxy_->Scan(req, &resp, &rpc)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_FALSE(resp.has_error()); |
| } |
| |
| // make sure that the snapshot timestamp that was selected is >= now |
| ASSERT_GE(resp.snap_timestamp(), pre_scan_ts.ToUint64()); |
| // The 'propagated_timestamp' field must be set for all successful responses. |
| ASSERT_TRUE(resp.has_propagated_timestamp()); |
| ASSERT_GT(mini_server_->server()->clock()->Now().ToUint64(), |
| resp.propagated_timestamp()); |
| ASSERT_LT(pre_scan_ts.ToUint64(), resp.propagated_timestamp()); |
| // The propagated timestamp should be after (i.e. greater) than the scan |
| // timestamp. |
| ASSERT_GT(resp.propagated_timestamp(), resp.snap_timestamp()); |
| } |
| |
| // Tests that a snapshot in the future (beyond the current time plus maximum |
| // synchronization error) fails as an invalid snapshot. |
| TEST_F(TabletServerTest, TestSnapshotScan_SnapshotInTheFutureFails) { |
| vector<uint64_t> write_timestamps_collector; |
| // perform a write |
| InsertTestRowsRemote(0, 1, 1, nullptr, kTabletId, &write_timestamps_collector); |
| |
| ScanRequestPB req; |
| ScanResponsePB resp; |
| RpcController rpc; |
| |
| // Set up a new request with no predicates, all columns. |
| const Schema& projection = schema_; |
| NewScanRequestPB* scan = req.mutable_new_scan_request(); |
| scan->set_tablet_id(kTabletId); |
| ASSERT_OK(SchemaToColumnPBs(projection, scan->mutable_projected_columns())); |
| req.set_call_seq_id(0); |
| req.set_batch_size_bytes(0); // so it won't return data right away |
| scan->set_read_mode(READ_AT_SNAPSHOT); |
| |
| Timestamp read_timestamp(write_timestamps_collector[0]); |
| // Increment the write timestamp by 60 secs: the server will definitely consider |
| // this in the future. |
| read_timestamp = HybridClock::TimestampFromMicroseconds( |
| HybridClock::GetPhysicalValueMicros(read_timestamp) + 60000000); |
| scan->set_snap_timestamp(read_timestamp.ToUint64()); |
| |
| // Send the call |
| { |
| SCOPED_TRACE(SecureDebugString(req)); |
| ASSERT_OK(proxy_->Scan(req, &resp, &rpc)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_TRUE(resp.has_error()); |
| ASSERT_EQ(TabletServerErrorPB::INVALID_SNAPSHOT, resp.error().code()); |
| } |
| } |
| |
| // Test retrying a snapshot scan using last_row. |
| TEST_F(TabletServerTest, TestSnapshotScan_LastRow) { |
| // Set the internal batching within the tserver to be small. Otherwise, |
| // even though we use a small batch size in our request, we'd end up reading |
| // many rows at a time. |
| FLAGS_scanner_batch_size_rows = 5; |
| const int num_rows = AllowSlowTests() ? 1000 : 100; |
| const int num_batches = AllowSlowTests() ? 10 : 5; |
| const int batch_size = num_rows / num_batches; |
| |
| // Generate some interleaved rows |
| for (int i = 0; i < batch_size; i++) { |
| ASSERT_OK(tablet_replica_->tablet()->Flush()); |
| for (int j = 0; j < num_rows; j++) { |
| if (j % batch_size == i) { |
| InsertTestRowsDirect(j, 1); |
| } |
| } |
| } |
| |
| // Remove all the key columns from the projection. |
| // This makes sure the scanner adds them in for sorting but removes them before returning |
| // to the client. |
| SchemaBuilder sb(schema_); |
| for (int i = 0; i < schema_.num_key_columns(); i++) { |
| sb.RemoveColumn(schema_.column(i).name()); |
| } |
| const Schema& projection = sb.BuildWithoutIds(); |
| |
| // Scan the whole tablet with a few different batch sizes. |
| for (int i = 1; i < 10000; i *= 2) { |
| ScanResponsePB resp; |
| ScanRequestPB req; |
| RpcController rpc; |
| |
| // Set up a new snapshot scan without a specified timestamp. |
| NewScanRequestPB* scan = req.mutable_new_scan_request(); |
| scan->set_tablet_id(kTabletId); |
| ASSERT_OK(SchemaToColumnPBs(projection, scan->mutable_projected_columns())); |
| req.set_call_seq_id(0); |
| scan->set_read_mode(READ_AT_SNAPSHOT); |
| scan->set_order_mode(ORDERED); |
| |
| // Send the call |
| { |
| SCOPED_TRACE(SecureDebugString(req)); |
| req.set_batch_size_bytes(0); // so it won't return data right away |
| ASSERT_OK(proxy_->Scan(req, &resp, &rpc)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_FALSE(resp.has_error()); |
| } |
| |
| vector<string> results; |
| do { |
| rpc.Reset(); |
| // Send the call. |
| { |
| SCOPED_TRACE(SecureDebugString(req)); |
| req.set_batch_size_bytes(i); |
| ASSERT_OK(proxy_->Scan(req, &resp, &rpc)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_FALSE(resp.has_error()); |
| } |
| // Save the rows into 'results' vector. |
| StringifyRowsFromResponse(projection, rpc, &resp, &results); |
| // Retry the scan, setting the last_row_key and snapshot based on the response. |
| scan->set_last_primary_key(resp.last_primary_key()); |
| scan->set_snap_timestamp(resp.snap_timestamp()); |
| } while (resp.has_more_results()); |
| |
| ASSERT_EQ(num_rows, results.size()); |
| |
| // Verify that we get the rows back in order. |
| KuduPartialRow row(&projection); |
| for (int j = 0; j < num_rows; j++) { |
| ASSERT_OK(row.SetInt32(0, j * 2)); |
| ASSERT_OK(row.SetStringCopy(1, StringPrintf("hello %d", j))); |
| string expected = "(" + row.ToString() + ")"; |
| ASSERT_EQ(expected, results[j]); |
| } |
| } |
| } |
| |
| // Tests that a read in the future succeeds if a propagated_timestamp (that is even |
| // further in the future) follows along. Also tests that the clock was updated so |
| // that no writes will ever have a timestamp post this snapshot. |
| TEST_F(TabletServerTest, TestSnapshotScan_SnapshotInTheFutureWithPropagatedTimestamp) { |
| vector<uint64_t> write_timestamps_collector; |
| // perform a write |
| InsertTestRowsRemote(0, 1, 1, nullptr, kTabletId, &write_timestamps_collector); |
| |
| ScanRequestPB req; |
| ScanResponsePB resp; |
| RpcController rpc; |
| |
| // Set up a new request with no predicates, all columns. |
| const Schema& projection = schema_; |
| NewScanRequestPB* scan = req.mutable_new_scan_request(); |
| scan->set_tablet_id(kTabletId); |
| ASSERT_OK(SchemaToColumnPBs(projection, scan->mutable_projected_columns())); |
| req.set_call_seq_id(0); |
| req.set_batch_size_bytes(0); // so it won't return data right away |
| scan->set_read_mode(READ_AT_SNAPSHOT); |
| |
| Timestamp read_timestamp(write_timestamps_collector[0]); |
| // increment the write timestamp by 5 secs, the server will definitely consider |
| // this in the future. |
| read_timestamp = HybridClock::TimestampFromMicroseconds( |
| HybridClock::GetPhysicalValueMicros(read_timestamp) + 5000000); |
| scan->set_snap_timestamp(read_timestamp.ToUint64()); |
| |
| // send a propagated timestamp that is an additional 100 msecs into the future. |
| Timestamp propagated_timestamp = HybridClock::TimestampFromMicroseconds( |
| HybridClock::GetPhysicalValueMicros(read_timestamp) + 100000); |
| scan->set_propagated_timestamp(propagated_timestamp.ToUint64()); |
| |
| // Send the call |
| { |
| SCOPED_TRACE(SecureDebugString(req)); |
| ASSERT_OK(proxy_->Scan(req, &resp, &rpc)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_FALSE(resp.has_error()); |
| } |
| |
| // make sure the server's current clock returns a value that is larger than the |
| // propagated timestamp. It should have the same physical time, but higher |
| // logical time (due to various calls to clock.Now() when processing the request). |
| Timestamp now = mini_server_->server()->clock()->Now(); |
| |
| ASSERT_EQ(HybridClock::GetPhysicalValueMicros(propagated_timestamp), |
| HybridClock::GetPhysicalValueMicros(now)); |
| |
| ASSERT_GT(HybridClock::GetLogicalValue(now), |
| HybridClock::GetLogicalValue(propagated_timestamp)); |
| |
| vector<string> results; |
| NO_FATALS(DrainScannerToStrings(resp.scanner_id(), schema_, &results)); |
| ASSERT_EQ(1, results.size()); |
| ASSERT_EQ(R"((int32 key=0, int32 int_val=0, string string_val="original0"))", results[0]); |
| } |
| |
| |
| // Test that a read in the future fails, even if a propagated_timestamp is sent along, |
| // if the read_timestamp is beyond the propagated_timestamp. |
| TEST_F(TabletServerTest, TestSnapshotScan__SnapshotInTheFutureBeyondPropagatedTimestampFails) { |
| vector<uint64_t> write_timestamps_collector; |
| // perform a write |
| InsertTestRowsRemote(0, 1, 1, nullptr, kTabletId, &write_timestamps_collector); |
| |
| ScanRequestPB req; |
| ScanResponsePB resp; |
| RpcController rpc; |
| |
| // Set up a new request with no predicates, all columns. |
| const Schema& projection = schema_; |
| NewScanRequestPB* scan = req.mutable_new_scan_request(); |
| scan->set_tablet_id(kTabletId); |
| ASSERT_OK(SchemaToColumnPBs(projection, scan->mutable_projected_columns())); |
| req.set_call_seq_id(0); |
| req.set_batch_size_bytes(0); // so it won't return data right away |
| scan->set_read_mode(READ_AT_SNAPSHOT); |
| |
| Timestamp read_timestamp(write_timestamps_collector[0]); |
| // increment the write timestamp by 60 secs, the server will definitely consider |
| // this in the future. |
| read_timestamp = HybridClock::TimestampFromMicroseconds( |
| HybridClock::GetPhysicalValueMicros(read_timestamp) + 60000000); |
| scan->set_snap_timestamp(read_timestamp.ToUint64()); |
| |
| // send a propagated timestamp that is an less than the read timestamp (but still |
| // in the future as far the server is concerned). |
| Timestamp propagated_timestamp = HybridClock::TimestampFromMicroseconds( |
| HybridClock::GetPhysicalValueMicros(read_timestamp) - 100000); |
| scan->set_propagated_timestamp(propagated_timestamp.ToUint64()); |
| |
| // Send the call |
| { |
| SCOPED_TRACE(SecureDebugString(req)); |
| ASSERT_OK(proxy_->Scan(req, &resp, &rpc)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_TRUE(resp.has_error()); |
| ASSERT_EQ(TabletServerErrorPB::INVALID_SNAPSHOT, resp.error().code()); |
| } |
| } |
| |
| // Scan with READ_YOUR_WRITES mode to ensure it can |
| // satisfy read-your-writes/read-your-reads session guarantee. |
| TEST_F(TabletServerTest, TestScanYourWrites) { |
| vector<uint64_t> write_timestamps_collector; |
| const int kNumRows = 100; |
| // Perform a write. |
| InsertTestRowsRemote(0, kNumRows, 1, nullptr, kTabletId, &write_timestamps_collector); |
| |
| // Scan with READ_YOUR_WRITES mode and use the previous |
| // write response as the propagated timestamp. |
| ScanResponsePB resp; |
| uint64_t propagated_timestamp = write_timestamps_collector[0]; |
| ScanYourWritesTest(propagated_timestamp, &resp); |
| |
| // Store the returned snapshot timestamp as the propagated |
| // timestamp for the next read. |
| propagated_timestamp = resp.snap_timestamp(); |
| // Drain all the rows from the scanner. |
| vector<string> results; |
| NO_FATALS(DrainScannerToStrings(resp.scanner_id(), schema_, &results)); |
| ASSERT_EQ(kNumRows, results.size()); |
| ASSERT_EQ(R"((int32 key=0, int32 int_val=0, string string_val="original0"))", results[0]); |
| ASSERT_EQ(R"((int32 key=99, int32 int_val=99, string string_val="original99"))", results[99]); |
| |
| // Rescan the tablet to ensure READ_YOUR_WRITES mode can |
| // satisfy read-your-reads session guarantee. |
| ScanResponsePB new_resp; |
| ScanYourWritesTest(propagated_timestamp, &new_resp); |
| // Drain all the rows from the scanner. |
| results.clear(); |
| NO_FATALS(DrainScannerToStrings(new_resp.scanner_id(), schema_, &results)); |
| ASSERT_EQ(kNumRows, results.size()); |
| ASSERT_EQ(R"((int32 key=0, int32 int_val=0, string string_val="original0"))", results[0]); |
| ASSERT_EQ(R"((int32 key=99, int32 int_val=99, string string_val="original99"))", results[99]); |
| } |
| |
| // Tests that a read succeeds even without propagated_timestamp. |
| TEST_F(TabletServerTest, TestScanYourWrites_WithoutPropagatedTimestamp) { |
| vector<uint64_t> write_timestamps_collector; |
| // Perform a write. |
| InsertTestRowsRemote(0, 1, 1, nullptr, kTabletId, &write_timestamps_collector); |
| |
| ScanResponsePB resp; |
| ScanYourWritesTest(Timestamp::kMin.ToUint64(), &resp); |
| } |
| |
| // Tests that a read succeeds even with a future propagated_timestamp. Also |
| // tests that the clock was updated so that no writes will ever have a |
| // timestamp before this snapshot. |
| TEST_F(TabletServerTest, TestScanYourWrites_PropagatedTimestampInTheFuture) { |
| vector<uint64_t> write_timestamps_collector; |
| // Perform a write. |
| InsertTestRowsRemote(0, 1, 1, nullptr, kTabletId, &write_timestamps_collector); |
| |
| ScanResponsePB resp; |
| // Increment the write timestamp by 5 secs: the server will definitely consider |
| // this in the future. |
| Timestamp propagated_timestamp(write_timestamps_collector[0]); |
| propagated_timestamp = HybridClock::TimestampFromMicroseconds( |
| HybridClock::GetPhysicalValueMicros(propagated_timestamp) + 5000000); |
| ScanYourWritesTest(propagated_timestamp.ToUint64(), &resp); |
| |
| // Make sure the server's current clock returns a value that is larger than the |
| // propagated timestamp. It should have the same physical time, but higher |
| // logical time (due to various calls to clock.Now() when processing the request). |
| Timestamp now = mini_server_->server()->clock()->Now(); |
| |
| ASSERT_EQ(HybridClock::GetPhysicalValueMicros(propagated_timestamp), |
| HybridClock::GetPhysicalValueMicros(now)); |
| |
| ASSERT_GT(HybridClock::GetLogicalValue(now), |
| HybridClock::GetLogicalValue(propagated_timestamp)); |
| |
| vector<string> results; |
| NO_FATALS(DrainScannerToStrings(resp.scanner_id(), schema_, &results)); |
| ASSERT_EQ(1, results.size()); |
| ASSERT_EQ(R"((int32 key=0, int32 int_val=0, string string_val="original0"))", results[0]); |
| } |
| |
| // Test that, if multiple RPCs arrive concurrently for a single scanner, no state is |
| // corrupted, etc. We expected errors but no crashes or TSAN issues. |
| TEST_F(TabletServerTest, TestConcurrentAccessToOneScanner) { |
| constexpr int kNumRows = 1000; |
| constexpr int kNumThreads = 8; |
| // Perform a write. |
| InsertTestRowsRemote(0, kNumRows, 1, nullptr, kTabletId); |
| |
| FLAGS_scanner_batch_size_rows = 10; |
| FLAGS_scanner_unregister_on_invalid_seq_id = false; |
| ScanResponsePB open_resp; |
| NO_FATALS(OpenScannerWithAllColumns(&open_resp)); |
| |
| std::atomic<bool> done { false }; |
| vector<thread> threads; |
| // Add a thread which concurrently lists scans. The ListScans() function accesses |
| // scanners to expose diagnostic info such as stats, etc, so calling it in a loop can |
| // help uncover potential races in this code path. |
| threads.emplace_back( |
| [&]() { |
| while (!done) { |
| mini_server_->server()->scanner_manager()->ListScans(); |
| } |
| }); |
| std::atomic<int> next_seq_id { 1 }; |
| std::atomic<int> total_rows { 0 }; |
| for (int i = 0; i < kNumThreads; i++) { |
| threads.emplace_back( |
| [&]() { |
| while (!done) { |
| RpcController rpc; |
| rpc.set_timeout(MonoDelta::FromSeconds(10)); |
| ScanRequestPB req; |
| ScanResponsePB resp; |
| req.set_scanner_id(open_resp.scanner_id()); |
| // Try either the expected next sequence ID or the following one. |
| // We sometimes try the following one since otherwise the race of two |
| // RPCs processing concurrently is very very narrow -- we increment the |
| // call ID immediately after looking up the scanner. |
| req.set_call_seq_id(next_seq_id + (rand() % 2)); |
| req.set_batch_size_bytes(100); |
| Status s = proxy_->Scan(req, &resp, &rpc); |
| CHECK_OK(s); |
| if (resp.has_error()) { |
| if (resp.error().code() == TabletServerErrorPB::SCANNER_EXPIRED) { |
| break; |
| } |
| CHECK(resp.error().code() == TabletServerErrorPB::INVALID_SCAN_CALL_SEQ_ID) |
| << "unexpected error: " << resp.error().DebugString(); |
| VLOG(1) << "Got an invalid seq id " << req.call_seq_id(); |
| } else { |
| VLOG(1) << "Continued scan: " << resp.data().num_rows() << " rows"; |
| total_rows += resp.data().num_rows(); |
| next_seq_id++; |
| if (!resp.has_more_results()) { |
| VLOG(1) << "Finished scan"; |
| done = true; |
| } |
| } |
| } |
| }); |
| } |
| for (auto& t : threads) { |
| t.join(); |
| } |
| ASSERT_EQ(total_rows, kNumRows); |
| } |
| |
| |
| TEST_F(TabletServerTest, TestScanWithStringPredicates) { |
| InsertTestRowsDirect(0, 100); |
| |
| ScanRequestPB req; |
| ScanResponsePB resp; |
| RpcController rpc; |
| |
| NewScanRequestPB* scan = req.mutable_new_scan_request(); |
| scan->set_tablet_id(kTabletId); |
| req.set_batch_size_bytes(0); // so it won't return data right away |
| ASSERT_OK(SchemaToColumnPBs(schema_, scan->mutable_projected_columns())); |
| |
| // Set up a range predicate: "hello 50" < string_val <= "hello 59" |
| ColumnRangePredicatePB* pred = scan->add_deprecated_range_predicates(); |
| pred->mutable_column()->CopyFrom(scan->projected_columns(2)); |
| |
| pred->set_lower_bound("hello 50"); |
| pred->set_inclusive_upper_bound("hello 59"); |
| |
| // Send the call |
| { |
| SCOPED_TRACE(SecureDebugString(req)); |
| ASSERT_OK(proxy_->Scan(req, &resp, &rpc)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_FALSE(resp.has_error()); |
| } |
| |
| // Drain all the rows from the scanner. |
| vector<string> results; |
| NO_FATALS( |
| DrainScannerToStrings(resp.scanner_id(), schema_, &results)); |
| ASSERT_EQ(10, results.size()); |
| ASSERT_EQ(R"((int32 key=50, int32 int_val=100, string string_val="hello 50"))", results[0]); |
| ASSERT_EQ(R"((int32 key=59, int32 int_val=118, string string_val="hello 59"))", results[9]); |
| } |
| |
| TEST_F(TabletServerTest, TestColumnarScan) { |
| const int kNumRows = 100; |
| InsertTestRowsDirect(0, kNumRows); |
| |
| ScanRequestPB req; |
| ScanResponsePB resp; |
| RpcController rpc; |
| |
| NewScanRequestPB* scan = req.mutable_new_scan_request(); |
| scan->set_tablet_id(kTabletId); |
| ASSERT_OK(SchemaToColumnPBs(schema_, scan->mutable_projected_columns())); |
| |
| scan->set_row_format_flags(RowFormatFlags::COLUMNAR_LAYOUT); |
| rpc.RequireServerFeature(TabletServerFeatures::COLUMNAR_LAYOUT_FEATURE); |
| // Send the call |
| SCOPED_TRACE(SecureDebugString(req)); |
| ASSERT_OK(proxy_->Scan(req, &resp, &rpc)); |
| |
| // Verify the response |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_FALSE(resp.has_error()); |
| ASSERT_EQ(3, resp.columnar_data().columns_size()); |
| |
| ASSERT_EQ(kNumRows, resp.columnar_data().num_rows()); |
| |
| // Verify column 0 (int32 key) |
| { |
| Slice col_data; |
| ASSERT_OK(rpc.GetInboundSidecar(resp.columnar_data().columns(0).data_sidecar(), &col_data)); |
| SCOPED_TRACE(col_data.ToDebugString()); |
| ASSERT_EQ(col_data.size(), kNumRows * sizeof(int32_t)); |
| ArrayView<const int32_t> cells(reinterpret_cast<const int32_t*>(col_data.data()), kNumRows); |
| for (int i = 0; i < kNumRows; i++) { |
| EXPECT_EQ(i, cells[i]); |
| } |
| } |
| |
| // Verify column 1 (int32 val) |
| { |
| Slice col_data; |
| ASSERT_OK(rpc.GetInboundSidecar(resp.columnar_data().columns(1).data_sidecar(), &col_data)); |
| SCOPED_TRACE(col_data.ToDebugString()); |
| ASSERT_EQ(col_data.size(), kNumRows * sizeof(int32_t)); |
| ArrayView<const int32_t> cells(reinterpret_cast<const int32_t*>(col_data.data()), kNumRows); |
| for (int i = 0; i < kNumRows; i++) { |
| EXPECT_EQ(i * 2, cells[i]); |
| } |
| } |
| // Verify column 2 (string) |
| { |
| Slice col_data; |
| ASSERT_OK(rpc.GetInboundSidecar(resp.columnar_data().columns(2).data_sidecar(), &col_data)); |
| |
| Slice varlen_data; |
| ASSERT_OK(rpc.GetInboundSidecar(resp.columnar_data().columns(2).varlen_data_sidecar(), |
| &varlen_data)); |
| |
| SCOPED_TRACE(col_data.ToDebugString()); |
| ASSERT_EQ(col_data.size(), (kNumRows + 1) * sizeof(uint32_t)); |
| ArrayView<const uint32_t> offsets(reinterpret_cast<const uint32_t*>(col_data.data()), |
| kNumRows + 1); |
| for (int i = 0; i < kNumRows; i++) { |
| Slice real_str(varlen_data.data() + offsets[i], |
| offsets[i + 1] - offsets[i]); |
| ASSERT_EQ(Substitute("hello $0", i), real_str); |
| } |
| } |
| } |
| |
| |
| TEST_F(TabletServerTest, TestNonPositiveLimitsShortCircuit) { |
| InsertTestRowsDirect(0, 10); |
| for (int limit : { -1, 0 }) { |
| ScanRequestPB req; |
| ScanResponsePB resp; |
| RpcController rpc; |
| |
| // Set up a new scan request with non-positive limits. |
| NewScanRequestPB* scan = req.mutable_new_scan_request(); |
| scan->set_tablet_id(kTabletId); |
| scan->set_limit(limit); |
| ASSERT_OK(SchemaToColumnPBs(schema_, scan->mutable_projected_columns())); |
| { |
| // Send the request and make sure we get no rows back. |
| SCOPED_TRACE(SecureDebugString(req)); |
| ASSERT_OK(proxy_->Scan(req, &resp, &rpc)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_FALSE(resp.has_error()); |
| |
| // We're expecting the scan to have short circuited and for no scanner to |
| // exist for the scan, so we need to "drain" it a bit differently. |
| ASSERT_FALSE(resp.has_scanner_id()); |
| unique_ptr<RowwiseRowBlockPB> data(resp.release_data()); |
| ASSERT_EQ(0, data->num_rows()); |
| } |
| } |
| } |
| |
| // Randomized test that runs a few scans with varying limits. |
| TEST_F(TabletServerTest, TestRandomizedScanLimits) { |
| // Set a relatively small batch size... |
| const int kBatchSizeRows = rand() % 1000; |
| // ...and decent number of rows, such that we can get a good mix of |
| // multiple-batch and single-batch scans. |
| const int kNumRows = rand() % 2000; |
| FLAGS_scanner_batch_size_rows = kBatchSizeRows; |
| InsertTestRowsDirect(0, kNumRows); |
| LOG(INFO) << Substitute("Rows inserted: $0, batch size: $1", kNumRows, kBatchSizeRows); |
| |
| for (int i = 1; i < 100; i++) { |
| // To broaden a range of coverage, gradiate the max limit that we can set. |
| const int kMaxLimit = kNumRows * static_cast<double>(0.01 * i); |
| |
| // Get a random limit, capped by the max, inclusive. |
| // "kMaxLimit" cannot be 0, if it's 0, we set "kLimit" to 1 directly. |
| const int kLimit = kMaxLimit == 0 ? 1 : rand() % kMaxLimit + 1; |
| LOG(INFO) << "Scanning with a limit of " << kLimit; |
| |
| ScanRequestPB req; |
| ScanResponsePB resp; |
| RpcController rpc; |
| NewScanRequestPB* scan = req.mutable_new_scan_request(); |
| scan->set_tablet_id(kTabletId); |
| scan->set_limit(kLimit); |
| req.set_batch_size_bytes(0); // so it won't return data right away |
| ASSERT_OK(SchemaToColumnPBs(schema_, scan->mutable_projected_columns())); |
| // Send the scan. |
| { |
| SCOPED_TRACE(SecureDebugString(req)); |
| ASSERT_OK(proxy_->Scan(req, &resp, &rpc)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_FALSE(resp.has_error()); |
| } |
| // Drain all the rows from the scanner. |
| vector<string> results; |
| NO_FATALS(DrainScannerToStrings(resp.scanner_id(), schema_, &results)); |
| ASSERT_EQ(results.size(), std::min({ kLimit, kNumRows })); |
| } |
| } |
| |
| TEST_F(TabletServerTest, TestScanWithPredicates) { |
| int num_rows = AllowSlowTests() ? 10000 : 1000; |
| InsertTestRowsDirect(0, num_rows); |
| |
| ScanRequestPB req; |
| ScanResponsePB resp; |
| RpcController rpc; |
| |
| NewScanRequestPB* scan = req.mutable_new_scan_request(); |
| scan->set_tablet_id(kTabletId); |
| req.set_batch_size_bytes(0); // so it won't return data right away |
| ASSERT_OK(SchemaToColumnPBs(schema_, scan->mutable_projected_columns())); |
| |
| // Set up a range predicate: 51 <= key <= 100 |
| ColumnRangePredicatePB* pred = scan->add_deprecated_range_predicates(); |
| pred->mutable_column()->CopyFrom(scan->projected_columns(0)); |
| |
| int32_t lower_bound_int = 51; |
| int32_t upper_bound_int = 100; |
| pred->mutable_lower_bound()->append(reinterpret_cast<char*>(&lower_bound_int), |
| sizeof(lower_bound_int)); |
| pred->mutable_inclusive_upper_bound()->append(reinterpret_cast<char*>(&upper_bound_int), |
| sizeof(upper_bound_int)); |
| |
| // Send the call |
| { |
| SCOPED_TRACE(SecureDebugString(req)); |
| ASSERT_OK(proxy_->Scan(req, &resp, &rpc)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_FALSE(resp.has_error()); |
| } |
| |
| // Drain all the rows from the scanner. |
| vector<string> results; |
| NO_FATALS( |
| DrainScannerToStrings(resp.scanner_id(), schema_, &results)); |
| ASSERT_EQ(50, results.size()); |
| } |
| |
| TEST_F(TabletServerTest, TestScanWithEncodedPredicates) { |
| InsertTestRowsDirect(0, 100); |
| |
| ScanRequestPB req; |
| ScanResponsePB resp; |
| RpcController rpc; |
| |
| NewScanRequestPB* scan = req.mutable_new_scan_request(); |
| scan->set_tablet_id(kTabletId); |
| req.set_batch_size_bytes(0); // so it won't return data right away |
| ASSERT_OK(SchemaToColumnPBs(schema_, scan->mutable_projected_columns())); |
| |
| // Set up a range predicate: 51 <= key <= 60 |
| // using encoded keys |
| int32_t start_key_int = 51; |
| int32_t stop_key_int = 60; |
| Arena arena(64); |
| EncodedKeyBuilder ekb(&schema_, &arena); |
| ekb.AddColumnKey(&start_key_int); |
| EncodedKey* start_encoded = ekb.BuildEncodedKey(); |
| |
| ekb.Reset(); |
| ekb.AddColumnKey(&stop_key_int); |
| EncodedKey* stop_encoded = ekb.BuildEncodedKey(); |
| |
| scan->mutable_start_primary_key()->assign( |
| reinterpret_cast<const char*>(start_encoded->encoded_key().data()), |
| start_encoded->encoded_key().size()); |
| scan->mutable_stop_primary_key()->assign( |
| reinterpret_cast<const char*>(stop_encoded->encoded_key().data()), |
| stop_encoded->encoded_key().size()); |
| |
| // Send the call |
| { |
| SCOPED_TRACE(SecureDebugString(req)); |
| ASSERT_OK(proxy_->Scan(req, &resp, &rpc)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_FALSE(resp.has_error()); |
| } |
| |
| // Drain all the rows from the scanner. |
| vector<string> results; |
| NO_FATALS( |
| DrainScannerToStrings(resp.scanner_id(), schema_, &results)); |
| ASSERT_EQ(9, results.size()); |
| EXPECT_EQ(R"((int32 key=51, int32 int_val=102, string string_val="hello 51"))", |
| results.front()); |
| EXPECT_EQ(R"((int32 key=59, int32 int_val=118, string string_val="hello 59"))", |
| results.back()); |
| } |
| |
| TEST_F(TabletServerTest, TestScanWithSimplifiablePredicates) { |
| int num_rows = AllowSlowTests() ? 10000 : 1000; |
| InsertTestRowsDirect(0, num_rows); |
| |
| ScanRequestPB req; |
| ScanResponsePB resp; |
| RpcController rpc; |
| |
| NewScanRequestPB* scan = req.mutable_new_scan_request(); |
| scan->set_tablet_id(kTabletId); |
| req.set_batch_size_bytes(0); // so it won't return data right away |
| // Set up a projection without the key columns or the column after the last key column |
| SchemaBuilder sb(schema_); |
| for (int i = 0; i <= schema_.num_key_columns(); i++) { |
| sb.RemoveColumn(schema_.column(i).name()); |
| } |
| const Schema& projection = sb.BuildWithoutIds(); |
| ASSERT_OK(SchemaToColumnPBs(projection, scan->mutable_projected_columns())); |
| |
| // Set up a key range predicate: 51 <= key < 100 |
| ColumnPredicatePB* key_predicate = scan->add_column_predicates(); |
| key_predicate->set_column(schema_.column(0).name()); |
| ColumnPredicatePB::Range* range = key_predicate->mutable_range(); |
| int32_t lower_bound_inclusive = 51; |
| int32_t upper_bound_exclusive = 100; |
| range->mutable_lower()->append( |
| reinterpret_cast<char*>(&lower_bound_inclusive), sizeof(lower_bound_inclusive)); |
| range->mutable_upper()->append( |
| reinterpret_cast<char*>(&upper_bound_exclusive), sizeof(upper_bound_exclusive)); |
| // Set up is not null predicate for not nullable column. |
| ColumnPredicatePB* is_not_null_predicate = scan->add_column_predicates(); |
| is_not_null_predicate->set_column(schema_.column(1).name()); |
| is_not_null_predicate->mutable_is_not_null(); |
| // Send the call |
| { |
| SCOPED_TRACE(SecureDebugString(req)); |
| ASSERT_OK(proxy_->Scan(req, &resp, &rpc)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_FALSE(resp.has_error()); |
| } |
| |
| // Ensure that the scanner includes correct columns. |
| { |
| auto scan_descriptors = mini_server_->server()->scanner_manager()->ListScans(); |
| ASSERT_EQ(1, projection.columns().size()); |
| ASSERT_EQ(1, scan_descriptors.size()); |
| ASSERT_EQ(projection.columns().size(), scan_descriptors[0].projected_columns.size()); |
| ASSERT_EQ(2, scan_descriptors[0].predicates.size()); |
| ASSERT_EQ(projection.columns().size(), scan_descriptors[0].iterator_stats.size()); |
| ASSERT_EQ(projection.column(0).name(), scan_descriptors[0].iterator_stats[0].first); |
| } |
| |
| // Drain all the rows from the scanner. |
| vector<string> results; |
| NO_FATALS( |
| DrainScannerToStrings(resp.scanner_id(), projection, &results)); |
| ASSERT_EQ(49, results.size()); |
| } |
| |
| // Test for diff scan RPC interface. |
| TEST_F(TabletServerTest, TestDiffScan) { |
| // Insert 100 rows with the usual pattern. |
| const int kStartRow = 0; |
| const int kNumRows = 1000; |
| const int kNumToUpdate = 200; |
| const int kNumToDelete = 100; |
| InsertTestRowsDirect(kStartRow, kNumRows); |
| Timestamp before_mutations = tablet_replica_->clock()->Now(); |
| |
| // Structure: key -> {val, is_deleted} |
| map<int32_t, pair<int32_t, bool>> expected; |
| |
| vector<int32_t> keys; |
| keys.reserve(kNumRows); |
| for (int32_t i = 0; i < kNumRows; i++) { |
| keys.emplace_back(i); |
| } |
| |
| // Update some random rows. |
| LocalTabletWriter writer(tablet_replica_->tablet(), &schema_); |
| std::mt19937 gen(SeedRandom()); |
| std::shuffle(keys.begin(), keys.end(), gen); |
| for (int i = 0; i < kNumToUpdate; i++) { |
| KuduPartialRow row(&schema_); |
| int32_t key = keys[i]; |
| CHECK_OK(row.SetInt32(0, key)); |
| int32_t new_val = key * 3; |
| CHECK_OK(row.SetInt32(1, new_val)); |
| InsertOrDie(&expected, key, pair<int32_t, bool>(new_val, false)); |
| CHECK_OK(writer.Update(row)); |
| } |
| |
| // Delete some random rows. |
| std::shuffle(keys.begin(), keys.end(), gen); |
| for (int i = 0; i < kNumToDelete; i++) { |
| KuduPartialRow row(&schema_); |
| int32_t key = keys[i]; |
| CHECK_OK(row.SetInt32(0, key)); |
| EmplaceOrUpdate(&expected, key, pair<int32_t, bool>(0 /* ignored */, true)); |
| CHECK_OK(writer.Delete(row)); |
| } |
| |
| Timestamp after_mutations = tablet_replica_->clock()->Now(); |
| |
| ScanRequestPB req; |
| ScanResponsePB resp; |
| RpcController rpc; |
| |
| // Build a projection with an IS_DELETED column. |
| SchemaBuilder builder(*tablet_replica_->tablet()->schema()); |
| const bool kIsDeletedDefault = false; |
| ASSERT_OK(builder.AddColumn("is_deleted", IS_DELETED, |
| /*is_nullable=*/ false, |
| /*read_default=*/ &kIsDeletedDefault, |
| /*write_default=*/ nullptr)); |
| Schema projection = builder.BuildWithoutIds(); |
| |
| // Start scan. |
| auto* new_scan = req.mutable_new_scan_request(); |
| new_scan->set_tablet_id(kTabletId); |
| ASSERT_OK(SchemaToColumnPBs(projection, new_scan->mutable_projected_columns())); |
| new_scan->set_read_mode(READ_AT_SNAPSHOT); |
| new_scan->set_order_mode(ORDERED); |
| new_scan->set_snap_start_timestamp(before_mutations.ToUint64()); |
| new_scan->set_snap_timestamp(after_mutations.ToUint64()); |
| |
| int call_seq_id = 0; |
| { |
| req.set_call_seq_id(call_seq_id); |
| req.set_batch_size_bytes(0); // So it won't return data right away. |
| SCOPED_TRACE(SecureDebugString(req)); |
| ASSERT_OK(proxy_->Scan(req, &resp, &rpc)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_FALSE(resp.has_error()); |
| ASSERT_EQ(0, resp.data().num_rows()); |
| } |
| |
| // Consume the scan results and validate that the values are as expected. |
| req.clear_new_scan_request(); |
| req.set_scanner_id(resp.scanner_id()); |
| |
| vector<string> results; |
| while (resp.has_more_results()) { |
| rpc.Reset(); |
| req.set_call_seq_id(++call_seq_id); |
| SCOPED_TRACE(SecureDebugString(req)); |
| ASSERT_OK(proxy_->Scan(req, &resp, &rpc)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_FALSE(resp.has_error()); |
| NO_FATALS(StringifyRowsFromResponse(projection, rpc, &resp, &results)); |
| } |
| |
| // Verify that the scan results match what we expected. |
| ASSERT_EQ(expected.size(), results.size()); |
| int i = 0; |
| for (const auto& entry : expected) { |
| int32_t key = entry.first; |
| int32_t val = entry.second.first; |
| bool is_deleted = entry.second.second; |
| string val_str = Substitute("$0", val); |
| if (is_deleted) { |
| val_str = ".*"; // Match any value on deleted values. |
| } |
| ASSERT_STR_MATCHES(results[i++], |
| Substitute("^\\(int32 key=$0, int32 int_val=$1, string string_val=\"hello $0\", " |
| "is_deleted is_deleted=$2\\)$$", key, val_str, is_deleted)); |
| } |
| } |
| |
| // Send various "bad" diff scan requests and validate that we catch the errors |
| // and respond with reasonable error messages. |
| TEST_F(TabletServerTest, TestDiffScanErrors) { |
| Timestamp before_insert = tablet_replica_->clock()->Now(); |
| InsertTestRowsDirect(/*start_row=*/0, /*num_rows=*/100); |
| Timestamp after_insert = tablet_replica_->clock()->Now(); |
| |
| // Build a projection with an IS_DELETED column. |
| SchemaBuilder builder(*tablet_replica_->tablet()->schema()); |
| const bool kIsDeletedDefault = false; |
| ASSERT_OK(builder.AddColumn("is_deleted", IS_DELETED, |
| /*is_nullable=*/ false, |
| /*read_default=*/ &kIsDeletedDefault, |
| /*write_default=*/ nullptr)); |
| Schema projection = builder.BuildWithoutIds(); |
| |
| ScanRequestPB req; |
| ScanResponsePB resp; |
| RpcController rpc; |
| |
| // Set up the RPC request. |
| auto* new_scan = req.mutable_new_scan_request(); |
| new_scan->set_tablet_id(kTabletId); |
| ASSERT_OK(SchemaToColumnPBs(projection, new_scan->mutable_projected_columns())); |
| new_scan->set_snap_start_timestamp(before_insert.ToUint64()); |
| new_scan->set_snap_timestamp(after_insert.ToUint64()); |
| |
| req.set_call_seq_id(0); |
| req.set_batch_size_bytes(0); // So it won't return data right away. |
| |
| // Send a scan request to the server and assert |
| auto req_assert_invalid_argument = [&](const TabletServerErrorPB::Code expected_code, |
| const string& expected_msg) { |
| rpc.Reset(); |
| ASSERT_OK(proxy_->Scan(req, &resp, &rpc)); |
| ASSERT_TRUE(resp.has_error()); |
| ASSERT_EQ(expected_code, resp.error().code()) |
| << "Expected " << TabletServerErrorPB::Code_Name(expected_code) |
| << ", got " << TabletServerErrorPB::Code_Name(resp.error().code()); |
| Status s = StatusFromPB(resp.error().status()); |
| ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString(); |
| ASSERT_STR_CONTAINS(s.ToString(), expected_msg); |
| }; |
| |
| // Attempt to start a diff scan with an illegal scan mode. |
| for (ReadMode read_mode : {READ_YOUR_WRITES, READ_LATEST}) { |
| new_scan->set_read_mode(read_mode); |
| NO_FATALS(req_assert_invalid_argument(TabletServerErrorPB::INVALID_SCAN_SPEC, |
| "scan start timestamp is only supported in READ_AT_SNAPSHOT read mode")); |
| } |
| new_scan->set_read_mode(READ_AT_SNAPSHOT); |
| |
| // Attempt to start a diff scan with an illegal order mode. |
| new_scan->set_order_mode(UNORDERED); |
| NO_FATALS(req_assert_invalid_argument(TabletServerErrorPB::INVALID_SCAN_SPEC, |
| "scan start timestamp is only supported in ORDERED order mode")); |
| new_scan->set_order_mode(ORDERED); |
| |
| // Attempt to start a diff scan with a too-early start timestamp. |
| new_scan->set_snap_start_timestamp(0); // Way before the AHM. |
| NO_FATALS(req_assert_invalid_argument(TabletServerErrorPB::INVALID_SNAPSHOT, |
| "snapshot scan start timestamp is earlier than the ancient history mark")); |
| |
| // Attempt to start a diff scan with a too-early end timestamp. |
| new_scan->set_snap_timestamp(1); |
| NO_FATALS(req_assert_invalid_argument(TabletServerErrorPB::INVALID_SNAPSHOT, |
| "snapshot scan end timestamp is earlier than the ancient history mark")); |
| |
| // Attempt to start a diff scan with a start timestamp higher than the end |
| // timestamp. |
| new_scan->set_snap_start_timestamp(after_insert.ToUint64()); |
| new_scan->set_snap_timestamp(before_insert.ToUint64()); |
| NO_FATALS(req_assert_invalid_argument(TabletServerErrorPB::INVALID_SNAPSHOT, |
| "must be less than or equal to end timestamp")); |
| } |
| |
| // Test requesting more rows from a scanner which doesn't exist |
| TEST_F(TabletServerTest, TestBadScannerID) { |
| ScanRequestPB req; |
| ScanResponsePB resp; |
| RpcController rpc; |
| |
| req.set_scanner_id("does-not-exist"); |
| |
| SCOPED_TRACE(SecureDebugString(req)); |
| ASSERT_OK(proxy_->Scan(req, &resp, &rpc)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_TRUE(resp.has_error()); |
| ASSERT_EQ(TabletServerErrorPB::SCANNER_EXPIRED, resp.error().code()); |
| } |
| |
| // Test passing a scanner ID, but also filling in some of the NewScanRequest |
| // field. |
| class InvalidScanRequest_NewScanAndScannerIDParamTest : |
| public TabletServerTest, |
| public ::testing::WithParamInterface<ReadMode> { |
| }; |
| TEST_P(InvalidScanRequest_NewScanAndScannerIDParamTest, Test) { |
| const ReadMode mode = GetParam(); |
| ScanRequestPB req; |
| ScanResponsePB resp; |
| RpcController rpc; |
| |
| NewScanRequestPB* scan = req.mutable_new_scan_request(); |
| scan->set_tablet_id(kTabletId); |
| scan->set_read_mode(mode); |
| req.set_batch_size_bytes(0); // so it won't return data right away |
| req.set_scanner_id("x"); |
| SCOPED_TRACE(SecureDebugString(req)); |
| Status s = proxy_->Scan(req, &resp, &rpc); |
| ASSERT_FALSE(s.ok()); |
| ASSERT_STR_CONTAINS(s.ToString(), "Must not pass both a scanner_id and new_scan_request"); |
| } |
| |
| INSTANTIATE_TEST_SUITE_P(Params, InvalidScanRequest_NewScanAndScannerIDParamTest, |
| testing::ValuesIn(kReadModes)); |
| |
| // Test that passing a projection with fields not present in the tablet schema |
| // throws an exception. |
| TEST_F(TabletServerTest, TestInvalidScanRequest_BadProjection) { |
| const Schema projection({ ColumnSchema("col_doesnt_exist", INT32) }, 0); |
| VerifyScanRequestFailure(projection, |
| TabletServerErrorPB::MISMATCHED_SCHEMA, |
| "Some columns are not present in the current schema: col_doesnt_exist"); |
| } |
| |
| // Test that passing a projection with mismatched type/nullability throws an exception. |
| TEST_F(TabletServerTest, TestInvalidScanRequest_BadProjectionTypes) { |
| Schema projection; |
| |
| // Verify mismatched nullability for the not-null int field |
| ASSERT_OK( |
| projection.Reset({ ColumnSchema("int_val", INT32, true) }, // should be NOT NULL |
| 0)); |
| VerifyScanRequestFailure(projection, |
| TabletServerErrorPB::MISMATCHED_SCHEMA, |
| "The column 'int_val' must have type INT32 NOT " |
| "NULL found INT32 NULLABLE"); |
| |
| // Verify mismatched nullability for the nullable string field |
| ASSERT_OK( |
| projection.Reset({ ColumnSchema("string_val", STRING, false) }, // should be NULLABLE |
| 0)); |
| VerifyScanRequestFailure(projection, |
| TabletServerErrorPB::MISMATCHED_SCHEMA, |
| "The column 'string_val' must have type STRING " |
| "NULLABLE found STRING NOT NULL"); |
| |
| // Verify mismatched type for the not-null int field |
| ASSERT_OK( |
| projection.Reset({ ColumnSchema("int_val", INT16, false) }, // should be INT32 NOT NULL |
| 0)); |
| VerifyScanRequestFailure(projection, |
| TabletServerErrorPB::MISMATCHED_SCHEMA, |
| "The column 'int_val' must have type INT32 NOT " |
| "NULL found INT16 NOT NULL"); |
| |
| // Verify mismatched type for the nullable string field |
| ASSERT_OK(projection.Reset( |
| { ColumnSchema("string_val", INT32, true) }, // should be STRING NULLABLE |
| 0)); |
| VerifyScanRequestFailure(projection, |
| TabletServerErrorPB::MISMATCHED_SCHEMA, |
| "The column 'string_val' must have type STRING " |
| "NULLABLE found INT32 NULLABLE"); |
| } |
| |
| TEST_F(TabletServerTest, TestInvalidScanRequest_UnknownOrderMode) { |
| NO_FATALS(InsertTestRowsDirect(0, 10)); |
| ScanRequestPB req; |
| NewScanRequestPB* scan = req.mutable_new_scan_request(); |
| scan->set_tablet_id(kTabletId); |
| scan->set_order_mode(OrderMode::UNKNOWN_ORDER_MODE); |
| ASSERT_OK(SchemaToColumnPBs(schema_, scan->mutable_projected_columns())); |
| req.set_call_seq_id(0); |
| NO_FATALS(VerifyScanRequestFailure(req, |
| TabletServerErrorPB::INVALID_SCAN_SPEC, |
| "Unknown order mode specified")); |
| } |
| |
| TEST_F(TabletServerTest, InvalidScanRequestNoGreaterKey) { |
| const int32_t key_val = INT32_MAX; |
| Arena arena(64); |
| EncodedKeyBuilder ekb(&schema_, &arena); |
| ekb.AddColumnKey(&key_val); |
| EncodedKey* key_encoded = ekb.BuildEncodedKey(); |
| |
| ScanRequestPB req; |
| NewScanRequestPB* scan = req.mutable_new_scan_request(); |
| scan->set_tablet_id(kTabletId); |
| scan->set_order_mode(OrderMode::ORDERED); |
| scan->set_read_mode(ReadMode::READ_AT_SNAPSHOT); |
| scan->set_last_primary_key(key_encoded->encoded_key().ToString()); |
| ASSERT_OK(SchemaToColumnPBs(schema_, scan->mutable_projected_columns())); |
| req.set_call_seq_id(0); |
| NO_FATALS(VerifyScanRequestFailure(req, |
| TabletServerErrorPB::INVALID_SCAN_SPEC, |
| "No lexicographically greater key exists")); |
| } |
| |
| // Test that passing a projection with Column IDs throws an exception. |
| // Column IDs are assigned to the user request schema on the tablet server |
| // based on the latest schema. |
| class InvalidScanRequest_WithIdsParamTest : |
| public TabletServerTest, |
| public ::testing::WithParamInterface<ReadMode> { |
| }; |
| TEST_P(InvalidScanRequest_WithIdsParamTest, Test) { |
| const SchemaPtr projection = tablet_replica_->tablet()->schema(); |
| ASSERT_TRUE(projection->has_column_ids()); |
| VerifyScanRequestFailure(*projection, |
| TabletServerErrorPB::INVALID_SCHEMA, |
| "User requests should not have Column IDs"); |
| } |
| |
| INSTANTIATE_TEST_SUITE_P(Params, InvalidScanRequest_WithIdsParamTest, |
| testing::ValuesIn(kReadModes)); |
| |
| // Test scanning a tablet that has no entries. |
| TEST_F(TabletServerTest, TestScan_NoResults) { |
| ScanRequestPB req; |
| ScanResponsePB resp; |
| RpcController rpc; |
| |
| // Set up a new request with no predicates, all columns. |
| const Schema& projection = schema_; |
| NewScanRequestPB* scan = req.mutable_new_scan_request(); |
| scan->set_tablet_id(kTabletId); |
| req.set_batch_size_bytes(0); // so it won't return data right away |
| ASSERT_OK(SchemaToColumnPBs(projection, scan->mutable_projected_columns())); |
| req.set_call_seq_id(0); |
| |
| // Send the call |
| { |
| SCOPED_TRACE(SecureDebugString(req)); |
| ASSERT_OK(proxy_->Scan(req, &resp, &rpc)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_FALSE(resp.has_error()); |
| |
| // Because there are no entries, we should immediately return "no results". |
| ASSERT_FALSE(resp.has_more_results()); |
| } |
| } |
| |
| // Test scanning a tablet that has no entries. |
| class InvalidScanSeqIdParamTest : |
| public TabletServerTest, |
| public ::testing::WithParamInterface<ReadMode> { |
| }; |
| TEST_P(InvalidScanSeqIdParamTest, Test) { |
| const ReadMode mode = GetParam(); |
| InsertTestRowsDirect(0, 10); |
| |
| ScanRequestPB req; |
| ScanResponsePB resp; |
| RpcController rpc; |
| |
| { |
| // Set up a new scan request with no predicates, all columns. |
| const Schema& projection = schema_; |
| NewScanRequestPB* scan = req.mutable_new_scan_request(); |
| scan->set_tablet_id(kTabletId); |
| scan->set_read_mode(mode); |
| ASSERT_OK(SchemaToColumnPBs(projection, scan->mutable_projected_columns())); |
| req.set_call_seq_id(0); |
| req.set_batch_size_bytes(0); // so it won't return data right away |
| |
| // Create the scanner |
| SCOPED_TRACE(SecureDebugString(req)); |
| ASSERT_OK(proxy_->Scan(req, &resp, &rpc)); |
| ASSERT_FALSE(resp.has_error()); |
| ASSERT_TRUE(resp.has_more_results()); |
| } |
| |
| string scanner_id = resp.scanner_id(); |
| resp.Clear(); |
| |
| { |
| // Continue the scan with an invalid sequence ID |
| req.Clear(); |
| rpc.Reset(); |
| req.set_scanner_id(scanner_id); |
| req.set_batch_size_bytes(0); // so it won't return data right away |
| req.set_call_seq_id(42); // should be 1 |
| |
| SCOPED_TRACE(SecureDebugString(req)); |
| ASSERT_OK(proxy_->Scan(req, &resp, &rpc)); |
| ASSERT_TRUE(resp.has_error()); |
| ASSERT_EQ(TabletServerErrorPB::INVALID_SCAN_CALL_SEQ_ID, resp.error().code()); |
| } |
| } |
| |
| INSTANTIATE_TEST_SUITE_P(Params, InvalidScanSeqIdParamTest, |
| testing::ValuesIn(kReadModes)); |
| |
| // Regression test for KUDU-1789: when ScannerKeepAlive is called on a non-existent |
| // scanner, it should properly respond with an error. |
| TEST_F(TabletServerTest, TestScan_KeepAliveExpiredScanner) { |
| StringVectorSink capture_logs; |
| ScopedRegisterSink reg(&capture_logs); |
| |
| ScannerKeepAliveRequestPB req; |
| ScannerKeepAliveResponsePB resp; |
| RpcController rpc; |
| |
| rpc.set_timeout(MonoDelta::FromSeconds(5)); |
| req.set_scanner_id("does-not-exist"); |
| ASSERT_OK(proxy_->ScannerKeepAlive(req, &resp, &rpc)); |
| ASSERT_TRUE(resp.has_error()) << SecureShortDebugString(resp); |
| ASSERT_EQ(resp.error().code(), TabletServerErrorPB::SCANNER_EXPIRED); |
| ASSERT_STR_MATCHES(resp.error().status().message(), "Scanner .* not found"); |
| |
| ASSERT_STRINGS_ANY_MATCH(capture_logs.logged_msgs(), |
| "ScannerKeepAlive: .* Scanner .* not found .* remote="); |
| } |
| |
| void TabletServerTest::ScanYourWritesTest(uint64_t propagated_timestamp, |
| ScanResponsePB* resp) { |
| ScanRequestPB req; |
| |
| // Set up a new request with no predicates, all columns. |
| const Schema &projection = schema_; |
| NewScanRequestPB *scan = req.mutable_new_scan_request(); |
| scan->set_tablet_id(kTabletId); |
| scan->set_read_mode(READ_YOUR_WRITES); |
| if (propagated_timestamp != Timestamp::kInvalidTimestamp.ToUint64()) { |
| scan->set_propagated_timestamp(propagated_timestamp); |
| } |
| ASSERT_OK(SchemaToColumnPBs(projection, scan->mutable_projected_columns())); |
| req.set_call_seq_id(0); |
| req.set_batch_size_bytes(0); // so it won't return data right away |
| |
| { |
| RpcController rpc; |
| SCOPED_TRACE(SecureDebugString(req)); |
| ASSERT_OK(proxy_->Scan(req, resp, &rpc)); |
| SCOPED_TRACE(SecureDebugString(*resp)); |
| ASSERT_FALSE(resp->has_error()); |
| } |
| |
| // Make sure that the chosen snapshot timestamp is sent back and |
| // it is larger than the previous propagation timestamp. |
| ASSERT_TRUE(resp->has_snap_timestamp()); |
| ASSERT_LT(propagated_timestamp, resp->snap_timestamp()); |
| // The 'propagated_timestamp' field must be set for 'success' responses. |
| ASSERT_TRUE(resp->has_propagated_timestamp()); |
| ASSERT_TRUE(resp->has_more_results()); |
| } |
| |
| void TabletServerTest::DoOrderedScanTest(const Schema& projection, |
| const string& expected_rows_as_string) { |
| InsertTestRowsDirect(0, 10); |
| ASSERT_OK(tablet_replica_->tablet()->Flush()); |
| InsertTestRowsDirect(10, 10); |
| ASSERT_OK(tablet_replica_->tablet()->Flush()); |
| InsertTestRowsDirect(20, 10); |
| |
| ScanResponsePB resp; |
| ScanRequestPB req; |
| RpcController rpc; |
| |
| // Set up a new snapshot scan without a specified timestamp. |
| NewScanRequestPB* scan = req.mutable_new_scan_request(); |
| scan->set_tablet_id(kTabletId); |
| ASSERT_OK(SchemaToColumnPBs(projection, scan->mutable_projected_columns())); |
| req.set_call_seq_id(0); |
| scan->set_read_mode(READ_AT_SNAPSHOT); |
| scan->set_order_mode(ORDERED); |
| |
| { |
| SCOPED_TRACE(SecureDebugString(req)); |
| req.set_batch_size_bytes(0); // so it won't return data right away |
| ASSERT_OK(proxy_->Scan(req, &resp, &rpc)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_FALSE(resp.has_error()); |
| } |
| |
| vector<string> results; |
| NO_FATALS( |
| DrainScannerToStrings(resp.scanner_id(), projection, &results)); |
| |
| ASSERT_EQ(30, results.size()); |
| |
| for (int i = 0; i < results.size(); ++i) { |
| ASSERT_EQ(results[i], Substitute(expected_rows_as_string, i, i * 2)); |
| } |
| } |
| |
| // Tests for KUDU-967. This test creates multiple row sets and then performs an ordered |
| // scan including the key columns in the projection but without marking them as keys. |
| // Without a fix for KUDU-967 the scan will often return out-of-order results. |
| TEST_F(TabletServerTest, TestOrderedScan_ProjectionWithKeyColumnsInOrder) { |
| // Build a projection with all the columns, but don't mark the key columns as such. |
| SchemaBuilder sb; |
| for (int i = 0; i < schema_.num_columns(); i++) { |
| sb.AddColumn(schema_.column(i), false); |
| } |
| const Schema& projection = sb.BuildWithoutIds(); |
| DoOrderedScanTest(projection, |
| R"((int32 key=$0, int32 int_val=$1, string string_val="hello $0"))"); |
| } |
| |
| // Same as above but doesn't add the key columns to the projection. |
| TEST_F(TabletServerTest, TestOrderedScan_ProjectionWithoutKeyColumns) { |
| // Build a projection without the key columns. |
| SchemaBuilder sb; |
| for (int i = schema_.num_key_columns(); i < schema_.num_columns(); i++) { |
| sb.AddColumn(schema_.column(i), false); |
| } |
| const Schema& projection = sb.BuildWithoutIds(); |
| DoOrderedScanTest(projection, R"((int32 int_val=$1, string string_val="hello $0"))"); |
| } |
| |
| // Same as above but creates a projection with the order of columns reversed. |
| TEST_F(TabletServerTest, TestOrderedScan_ProjectionWithKeyColumnsOutOfOrder) { |
| // Build a projection with the order of the columns reversed. |
| SchemaBuilder sb; |
| for (int i = schema_.num_columns() - 1; i >= 0; i--) { |
| sb.AddColumn(schema_.column(i), false); |
| } |
| const Schema& projection = sb.BuildWithoutIds(); |
| DoOrderedScanTest(projection, |
| R"((string string_val="hello $0", int32 int_val=$1, int32 key=$0))"); |
| } |
| |
| TEST_F(TabletServerTest, TestSplitKeyRange) { |
| int kNumRowsets = 10; |
| int kRowsetSize = 10; |
| scoped_refptr<TabletReplica> replica; |
| ASSERT_TRUE(mini_server_->server()->tablet_manager()->LookupTablet(kTabletId, &replica)); |
| for (int i = 0; i < kNumRowsets; i++) { |
| InsertTestRowsDirect(kRowsetSize * i, kRowsetSize); |
| ASSERT_OK(replica->tablet()->Flush()); |
| } |
| |
| { |
| SplitKeyRangeRequestPB req; |
| SplitKeyRangeResponsePB resp; |
| RpcController rpc; |
| req.set_tablet_id(kTabletId); |
| // Request the smallest possible chunk size, expecting we get back a range |
| // for every rowset. |
| req.set_target_chunk_size_bytes(1); |
| ColumnSchemaToPB(ColumnSchema("key", INT32), req.add_columns()); |
| ASSERT_OK(proxy_->SplitKeyRange(req, &resp, &rpc)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_FALSE(resp.has_error()); |
| ASSERT_EQ(kNumRowsets, resp.ranges_size()); |
| } |
| } |
| |
| TEST_F(TabletServerTest, TestAlterSchema) { |
| AlterSchemaRequestPB req; |
| AlterSchemaResponsePB resp; |
| RpcController rpc; |
| |
| InsertTestRowsRemote(0, 2); |
| |
| // Add one column with a default value |
| const int32_t c2_write_default = 5; |
| const int32_t c2_read_default = 7; |
| SchemaBuilder builder(schema_); |
| ASSERT_OK(builder.AddColumn("c2", INT32, false, &c2_read_default, &c2_write_default)); |
| Schema s2 = builder.Build(); |
| |
| req.set_dest_uuid(mini_server_->server()->fs_manager()->uuid()); |
| req.set_tablet_id(kTabletId); |
| req.set_schema_version(1); |
| ASSERT_OK(SchemaToPB(s2, req.mutable_schema())); |
| |
| // Send the call |
| { |
| SCOPED_TRACE(SecureDebugString(req)); |
| ASSERT_OK(admin_proxy_->AlterSchema(req, &resp, &rpc)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_FALSE(resp.has_error()); |
| } |
| |
| { |
| InsertTestRowsRemote(2, 2); |
| scoped_refptr<TabletReplica> tablet; |
| ASSERT_TRUE(mini_server_->server()->tablet_manager()->LookupTablet(kTabletId, &tablet)); |
| ASSERT_OK(tablet->tablet()->Flush()); |
| } |
| |
| const Schema projection({ ColumnSchema("key", INT32), (ColumnSchema("c2", INT32)) }, 1); |
| |
| // Try recovering from the original log |
| NO_FATALS(ShutdownAndRebuildTablet()); |
| VerifyRows(projection, { KeyValue(0, 7), |
| KeyValue(1, 7), |
| KeyValue(2, 5), |
| KeyValue(3, 5) }); |
| |
| // Try recovering from the log generated on recovery |
| NO_FATALS(ShutdownAndRebuildTablet()); |
| VerifyRows(projection, { KeyValue(0, 7), |
| KeyValue(1, 7), |
| KeyValue(2, 5), |
| KeyValue(3, 5) }); |
| } |
| |
| // Adds a new column with no "write default", and then restarts the tablet |
| // server. Inserts that were made before the new column was added should |
| // still replay properly during bootstrap. |
| // |
| // Regression test for KUDU-181. |
| TEST_F(TabletServerTest, TestAlterSchema_AddColWithoutWriteDefault) { |
| AlterSchemaRequestPB req; |
| AlterSchemaResponsePB resp; |
| RpcController rpc; |
| |
| InsertTestRowsRemote(0, 2); |
| |
| // Add a column with a read-default but no write-default. |
| const uint32_t c2_read_default = 7; |
| SchemaBuilder builder(schema_); |
| ASSERT_OK(builder.AddColumn("c2", INT32, false, &c2_read_default, nullptr)); |
| Schema s2 = builder.Build(); |
| |
| req.set_dest_uuid(mini_server_->server()->fs_manager()->uuid()); |
| req.set_tablet_id(kTabletId); |
| req.set_schema_version(1); |
| ASSERT_OK(SchemaToPB(s2, req.mutable_schema())); |
| |
| // Send the call |
| { |
| SCOPED_TRACE(SecureDebugString(req)); |
| ASSERT_OK(admin_proxy_->AlterSchema(req, &resp, &rpc)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_FALSE(resp.has_error()); |
| } |
| |
| // Verify that the old data picked up the read default. |
| |
| const Schema projection({ ColumnSchema("key", INT32), ColumnSchema("c2", INT32) }, 1); |
| VerifyRows(projection, { KeyValue(0, 7), KeyValue(1, 7) }); |
| |
| // Try recovering from the original log |
| NO_FATALS(ShutdownAndRebuildTablet()); |
| VerifyRows(projection, { KeyValue(0, 7), KeyValue(1, 7) }); |
| |
| // Try recovering from the log generated on recovery |
| NO_FATALS(ShutdownAndRebuildTablet()); |
| VerifyRows(projection, { KeyValue(0, 7), KeyValue(1, 7) }); |
| } |
| |
| TEST_F(TabletServerTest, TestCreateTablet_TabletExists) { |
| CreateTabletRequestPB req; |
| CreateTabletResponsePB resp; |
| RpcController rpc; |
| |
| req.set_dest_uuid(mini_server_->server()->fs_manager()->uuid()); |
| req.set_table_id("testtb"); |
| req.set_tablet_id(kTabletId); |
| PartitionPB* partition = req.mutable_partition(); |
| partition->set_partition_key_start(" "); |
| partition->set_partition_key_end(" "); |
| req.set_table_name("testtb"); |
| req.mutable_config()->CopyFrom(mini_server_->CreateLocalConfig()); |
| |
| Schema schema = SchemaBuilder(schema_).Build(); |
| ASSERT_OK(SchemaToPB(schema, req.mutable_schema())); |
| |
| // Send the call |
| { |
| SCOPED_TRACE(SecureDebugString(req)); |
| ASSERT_OK(admin_proxy_->CreateTablet(req, &resp, &rpc)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_TRUE(resp.has_error()); |
| ASSERT_EQ(TabletServerErrorPB::TABLET_ALREADY_EXISTS, resp.error().code()); |
| } |
| } |
| |
| TEST_F(TabletServerTest, TestDeleteTablet) { |
| scoped_refptr<TabletReplica> tablet; |
| |
| // Verify that the tablet exists |
| ASSERT_TRUE(mini_server_->server()->tablet_manager()->LookupTablet(kTabletId, &tablet)); |
| |
| // Fetch the metric for the number of on-disk blocks, so we can later verify |
| // that we actually remove data. |
| scoped_refptr<AtomicGauge<uint64_t> > ondisk = |
| METRIC_log_block_manager_blocks_under_management.Instantiate( |
| mini_server_->server()->metric_entity(), 0); |
| const int block_count_before_flush = ondisk->value(); |
| if (FLAGS_block_manager == "log") { |
| ASSERT_EQ(block_count_before_flush, 0); |
| } |
| |
| // Put some data in the tablet. We flush and insert more rows to ensure that |
| // there is data both in the MRS and on disk. |
| NO_FATALS(InsertTestRowsRemote(1, 1)); |
| ASSERT_OK(tablet_replica_->tablet()->Flush()); |
| NO_FATALS(InsertTestRowsRemote(2, 1)); |
| |
| const int block_count_after_flush = ondisk->value(); |
| if (FLAGS_block_manager == "log") { |
| ASSERT_GT(block_count_after_flush, block_count_before_flush); |
| } |
| |
| // Drop any local references to the tablet from within this test, |
| // so that when we delete it on the server, it's not held alive |
| // by the test code. |
| tablet_replica_.reset(); |
| tablet.reset(); |
| |
| DeleteTabletRequestPB req; |
| DeleteTabletResponsePB resp; |
| RpcController rpc; |
| |
| req.set_dest_uuid(mini_server_->server()->fs_manager()->uuid()); |
| req.set_tablet_id(kTabletId); |
| req.set_delete_type(tablet::TABLET_DATA_DELETED); |
| |
| // Send the call |
| { |
| SCOPED_TRACE(SecureDebugString(req)); |
| ASSERT_OK(admin_proxy_->DeleteTablet(req, &resp, &rpc)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_FALSE(resp.has_error()); |
| } |
| |
| // Verify that the tablet is removed from the tablet map |
| ASSERT_FALSE(mini_server_->server()->tablet_manager()->LookupTablet(kTabletId, &tablet)); |
| |
| // Verify that fetching metrics doesn't crash. Regression test for KUDU-638. |
| EasyCurl c; |
| faststring buf; |
| ASSERT_OK(c.FetchURL(strings::Substitute( |
| "http://$0/jsonmetricz", |
| mini_server_->bound_http_addr().ToString()), |
| &buf)); |
| |
| // Verify data was actually removed. |
| const int block_count_after_delete = ondisk->value(); |
| if (FLAGS_block_manager == "log") { |
| ASSERT_EQ(block_count_after_delete, 0); |
| } |
| |
| // Verify that after restarting the TS, the tablet is still not in the tablet manager. |
| // This ensures that the on-disk metadata got removed. |
| Status s = ShutdownAndRebuildTablet(); |
| ASSERT_TRUE(s.IsNotFound()) << s.ToString(); |
| ASSERT_FALSE(mini_server_->server()->tablet_manager()->LookupTablet(kTabletId, &tablet)); |
| } |
| |
| TEST_F(TabletServerTest, TestDeleteTablet_TabletNotCreated) { |
| DeleteTabletRequestPB req; |
| DeleteTabletResponsePB resp; |
| RpcController rpc; |
| |
| req.set_dest_uuid(mini_server_->server()->fs_manager()->uuid()); |
| req.set_tablet_id("NotPresentTabletId"); |
| req.set_delete_type(tablet::TABLET_DATA_DELETED); |
| |
| // Send the call |
| { |
| SCOPED_TRACE(SecureDebugString(req)); |
| ASSERT_OK(admin_proxy_->DeleteTablet(req, &resp, &rpc)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_TRUE(resp.has_error()); |
| ASSERT_EQ(TabletServerErrorPB::TABLET_NOT_FOUND, resp.error().code()); |
| } |
| } |
| |
| TEST_F(TabletServerTest, TestDeleteTabletBenchmark) { |
| // Collect some related metrics. |
| scoped_refptr<AtomicGauge<uint64_t>> block_count = |
| METRIC_log_block_manager_blocks_under_management.Instantiate( |
| mini_server_->server()->metric_entity(), 0); |
| scoped_refptr<AtomicGauge<uint64_t>> container = |
| METRIC_log_block_manager_containers.Instantiate( |
| mini_server_->server()->metric_entity(), 0); |
| scoped_refptr<Counter> holes_punched = |
| METRIC_log_block_manager_holes_punched.Instantiate( |
| mini_server_->server()->metric_entity()); |
| |
| // Put some data in the tablet. We insert rows and flush immediately to |
| // ensure that there is enough blocks on disk to run the benchmark. |
| for (int i = 0; i < FLAGS_delete_tablet_bench_num_flushes; i++) { |
| NO_FATALS(InsertTestRowsRemote(i, 1)); |
| ASSERT_OK(tablet_replica_->tablet()->Flush()); |
| } |
| const int block_count_before_delete = block_count->value(); |
| |
| // Drop any local references to the tablet from within this test, |
| // so that when we delete it on the server, it's not held alive |
| // by the test code. |
| tablet_replica_.reset(); |
| |
| DeleteTabletRequestPB req; |
| DeleteTabletResponsePB resp; |
| RpcController rpc; |
| |
| req.set_dest_uuid(mini_server_->server()->fs_manager()->uuid()); |
| req.set_tablet_id(kTabletId); |
| req.set_delete_type(tablet::TABLET_DATA_DELETED); |
| |
| // Send the call and measure the time spent deleting the tablet. |
| LOG_TIMING(INFO, "deleting tablet") { |
| SCOPED_TRACE(SecureDebugString(req)); |
| ASSERT_OK(admin_proxy_->DeleteTablet(req, &resp, &rpc)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_FALSE(resp.has_error()); |
| } |
| |
| // Log the related metrics. |
| LOG(INFO) << "block_count_before_delete : " << block_count_before_delete; |
| LOG(INFO) << "log_block_manager_containers : " << container->value(); |
| LOG(INFO) << "log_block_manager_holes_punched : " << holes_punched->value(); |
| } |
| |
| // Test that with concurrent requests to delete the same tablet, one wins and |
| // the other fails, with no assertion failures. Regression test for KUDU-345. |
| TEST_F(TabletServerTest, TestConcurrentDeleteTablet) { |
| // Verify that the tablet exists |
| scoped_refptr<TabletReplica> tablet; |
| ASSERT_TRUE(mini_server_->server()->tablet_manager()->LookupTablet(kTabletId, &tablet)); |
| |
| static const int kNumDeletes = 2; |
| RpcController rpcs[kNumDeletes]; |
| DeleteTabletResponsePB responses[kNumDeletes]; |
| CountDownLatch latch(kNumDeletes); |
| |
| DeleteTabletRequestPB req; |
| req.set_dest_uuid(mini_server_->server()->fs_manager()->uuid()); |
| req.set_tablet_id(kTabletId); |
| req.set_delete_type(tablet::TABLET_DATA_DELETED); |
| |
| for (int i = 0; i < kNumDeletes; i++) { |
| SCOPED_TRACE(SecureDebugString(req)); |
| admin_proxy_->DeleteTabletAsync(req, &responses[i], &rpcs[i], |
| [&]() { latch.CountDown(); }); |
| } |
| latch.Wait(); |
| |
| int num_success = 0; |
| for (int i = 0; i < kNumDeletes; i++) { |
| ASSERT_TRUE(rpcs[i].finished()); |
| LOG(INFO) << "STATUS " << i << ": " << rpcs[i].status().ToString(); |
| LOG(INFO) << "RESPONSE " << i << ": " << SecureDebugString(responses[i]); |
| if (!responses[i].has_error()) { |
| num_success++; |
| } |
| } |
| |
| // Verify that the tablet is removed from the tablet map |
| ASSERT_FALSE(mini_server_->server()->tablet_manager()->LookupTablet(kTabletId, &tablet)); |
| ASSERT_EQ(1, num_success); |
| } |
| |
| // Validate the hostname and the UUID of the server from the metrics webpage |
| TEST_F(TabletServerTest, ServerAttributes) { |
| EasyCurl c; |
| faststring buf; |
| ASSERT_OK(c.FetchURL(Substitute("http://$0/metrics?ids=kudu.tabletserver", |
| mini_server_->bound_http_addr().ToString()), |
| &buf)); |
| string raw = buf.ToString(); |
| string server_hostname; |
| ASSERT_STR_CONTAINS(raw, "\"uuid\": \"" + mini_server_->uuid() + "\""); |
| ASSERT_OK(GetFQDN(&server_hostname)); |
| ASSERT_STR_CONTAINS(raw, "\"hostname\": \"" + server_hostname + "\""); |
| } |
| |
| TEST_F(TabletServerTest, TestInsertLatencyMicroBenchmark) { |
| METRIC_DEFINE_entity(test); |
| METRIC_DEFINE_histogram(test, insert_latency, |
| "Insert Latency", |
| MetricUnit::kMicroseconds, |
| "TabletServer single threaded insert latency.", |
| kudu::MetricLevel::kInfo, |
| 10000000, |
| 2); |
| |
| scoped_refptr<Histogram> histogram = METRIC_insert_latency.Instantiate(ts_test_metric_entity_); |
| |
| int warmup = AllowSlowTests() ? |
| FLAGS_single_threaded_insert_latency_bench_warmup_rows : 10; |
| |
| for (int i = 0; i < warmup; i++) { |
| InsertTestRowsRemote(i, 1); |
| } |
| |
| int max_rows = AllowSlowTests() ? |
| FLAGS_single_threaded_insert_latency_bench_insert_rows : 100; |
| |
| MonoTime start = MonoTime::Now(); |
| |
| for (int i = warmup; i < warmup + max_rows; i++) { |
| MonoTime before = MonoTime::Now(); |
| InsertTestRowsRemote(i, 1); |
| MonoTime after = MonoTime::Now(); |
| MonoDelta delta = after - before; |
| histogram->Increment(delta.ToMicroseconds()); |
| } |
| |
| MonoTime end = MonoTime::Now(); |
| double throughput = ((max_rows - warmup) * 1.0) / (end - start).ToSeconds(); |
| |
| // Generate the JSON. |
| std::ostringstream out; |
| JsonWriter writer(&out, JsonWriter::PRETTY); |
| ASSERT_OK(histogram->WriteAsJson(&writer, MetricJsonOptions())); |
| |
| LOG(INFO) << "Throughput: " << throughput << " rows/sec."; |
| LOG(INFO) << out.str(); |
| } |
| |
| // Simple test to ensure we can destroy an RpcServer in different states of |
| // initialization before Start()ing it. |
| TEST_F(TabletServerTest, TestRpcServerCreateDestroy) { |
| RpcServerOptions opts; |
| { |
| RpcServer server(opts); |
| } |
| { |
| RpcServer server(opts); |
| MessengerBuilder mb("foo"); |
| shared_ptr<Messenger> messenger; |
| ASSERT_OK(mb.Build(&messenger)); |
| ASSERT_OK(server.Init(messenger)); |
| } |
| } |
| |
| TEST_F(TabletServerTest, TestWriteOutOfBounds) { |
| const char *tabletId = "TestWriteOutOfBoundsTablet"; |
| Schema schema = SchemaBuilder(schema_).Build(); |
| |
| PartitionSchema partition_schema; |
| CHECK_OK(PartitionSchema::FromPB(PartitionSchemaPB(), schema, &partition_schema)); |
| |
| KuduPartialRow start_row(&schema); |
| ASSERT_OK(start_row.SetInt32("key", 10)); |
| |
| KuduPartialRow end_row(&schema); |
| ASSERT_OK(end_row.SetInt32("key", 20)); |
| |
| vector<Partition> partitions; |
| ASSERT_OK(partition_schema.CreatePartitions({ start_row, end_row }, {}, schema, &partitions)); |
| |
| ASSERT_EQ(3, partitions.size()); |
| |
| ASSERT_OK(mini_server_->server()->tablet_manager()->CreateNewTablet( |
| "TestWriteOutOfBoundsTable", tabletId, |
| partitions[1], |
| tabletId, schema, partition_schema, |
| mini_server_->CreateLocalConfig(), nullopt, nullopt, nullopt, nullptr)); |
| |
| ASSERT_OK(WaitForTabletRunning(tabletId)); |
| |
| WriteRequestPB req; |
| WriteResponsePB resp; |
| RpcController controller; |
| req.set_tablet_id(tabletId); |
| ASSERT_OK(SchemaToPB(schema_, req.mutable_schema())); |
| |
| vector<RowOperationsPB::Type> ops = { RowOperationsPB::INSERT, RowOperationsPB::UPDATE }; |
| |
| for (const RowOperationsPB::Type &op : ops) { |
| RowOperationsPB* data = req.mutable_row_operations(); |
| AddTestRowToPB(op, schema_, 20, 1, "1", data); |
| SCOPED_TRACE(SecureDebugString(req)); |
| ASSERT_OK(proxy_->Write(req, &resp, &controller)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| |
| ASSERT_TRUE(resp.has_error()); |
| ASSERT_EQ(TabletServerErrorPB::UNKNOWN_ERROR, resp.error().code()); |
| Status s = StatusFromPB(resp.error().status()); |
| EXPECT_TRUE(s.IsNotFound()); |
| ASSERT_STR_CONTAINS(s.ToString(), |
| "Not found: Row not in tablet partition"); |
| data->Clear(); |
| controller.Reset(); |
| } |
| } |
| |
| static uint32_t CalcTestRowChecksum(int32_t key, uint8_t string_field_defined = true) { |
| crc::Crc* crc = crc::GetCrc32cInstance(); |
| uint64_t row_crc = 0; |
| |
| string strval = strings::Substitute("original$0", key); |
| uint32_t index = 0; |
| crc->Compute(&index, sizeof(index), &row_crc, nullptr); |
| crc->Compute(&key, sizeof(int32_t), &row_crc, nullptr); |
| |
| index = 1; |
| crc->Compute(&index, sizeof(index), &row_crc, nullptr); |
| crc->Compute(&key, sizeof(int32_t), &row_crc, nullptr); |
| |
| index = 2; |
| crc->Compute(&index, sizeof(index), &row_crc, nullptr); |
| crc->Compute(&string_field_defined, sizeof(string_field_defined), &row_crc, nullptr); |
| if (string_field_defined) { |
| crc->Compute(strval.c_str(), strval.size(), &row_crc, nullptr); |
| } |
| return static_cast<uint32_t>(row_crc); |
| } |
| |
| // Simple test to check that our checksum scans work as expected. |
| TEST_F(TabletServerTest, TestChecksumScan) { |
| uint64_t total_crc = 0; |
| |
| ChecksumRequestPB req; |
| req.mutable_new_request()->set_tablet_id(kTabletId); |
| req.mutable_new_request()->set_read_mode(READ_LATEST); |
| req.set_call_seq_id(0); |
| ASSERT_OK(SchemaToColumnPBs(schema_, req.mutable_new_request()->mutable_projected_columns(), |
| SCHEMA_PB_WITHOUT_IDS)); |
| ChecksumRequestPB new_req = req; // Cache "new" request. |
| |
| ChecksumResponsePB resp; |
| RpcController controller; |
| ASSERT_OK(proxy_->Checksum(req, &resp, &controller)); |
| |
| // No rows. |
| ASSERT_EQ(total_crc, resp.checksum()); |
| ASSERT_FALSE(resp.has_more_results()); |
| |
| // First row. |
| int32_t key = 1; |
| InsertTestRowsRemote(key, 1); |
| controller.Reset(); |
| ASSERT_OK(proxy_->Checksum(req, &resp, &controller)); |
| total_crc += CalcTestRowChecksum(key); |
| uint64_t first_crc = total_crc; // Cache first record checksum. |
| |
| ASSERT_FALSE(resp.has_error()) << SecureDebugString(resp.error()); |
| ASSERT_EQ(total_crc, resp.checksum()); |
| ASSERT_FALSE(resp.has_more_results()); |
| EXPECT_TRUE(resp.has_resource_metrics()); |
| EXPECT_EQ(1, resp.rows_checksummed()); |
| |
| // Second row (null string field). |
| key = 2; |
| InsertTestRowsRemote(key, 1, 1, nullptr, kTabletId, nullptr, nullptr, false); |
| controller.Reset(); |
| ASSERT_OK(proxy_->Checksum(req, &resp, &controller)); |
| total_crc += CalcTestRowChecksum(key, false); |
| |
| ASSERT_FALSE(resp.has_error()) << SecureDebugString(resp.error()); |
| ASSERT_EQ(total_crc, resp.checksum()); |
| ASSERT_FALSE(resp.has_more_results()); |
| |
| // Now test the same thing, but with a scan requiring 2 passes (one per row). |
| FLAGS_scanner_batch_size_rows = 1; |
| req.set_batch_size_bytes(1); |
| controller.Reset(); |
| ASSERT_OK(proxy_->Checksum(req, &resp, &controller)); |
| string scanner_id = resp.scanner_id(); |
| ASSERT_TRUE(resp.has_more_results()); |
| uint64_t agg_checksum = resp.checksum(); |
| |
| // Second row. |
| req.clear_new_request(); |
| req.mutable_continue_request()->set_scanner_id(scanner_id); |
| req.mutable_continue_request()->set_previous_checksum(agg_checksum); |
| req.set_call_seq_id(1); |
| controller.Reset(); |
| ASSERT_OK(proxy_->Checksum(req, &resp, &controller)); |
| ASSERT_EQ(total_crc, resp.checksum()); |
| ASSERT_FALSE(resp.has_more_results()); |
| |
| // Finally, delete row 2, so we're back to the row 1 checksum. |
| NO_FATALS(DeleteTestRowsRemote(key, 1)); |
| FLAGS_scanner_batch_size_rows = 100; |
| req = new_req; |
| controller.Reset(); |
| ASSERT_OK(proxy_->Checksum(req, &resp, &controller)); |
| ASSERT_NE(total_crc, resp.checksum()); |
| ASSERT_EQ(first_crc, resp.checksum()); |
| ASSERT_FALSE(resp.has_more_results()); |
| } |
| |
| class DelayFsyncLogHook : public log::LogFaultHooks { |
| public: |
| DelayFsyncLogHook() : log_latch1_(1), test_latch1_(1) {} |
| |
| Status PostAppend() override { |
| test_latch1_.CountDown(); |
| log_latch1_.Wait(); |
| log_latch1_.Reset(1); |
| return Status::OK(); |
| } |
| |
| void Continue() { |
| test_latch1_.Wait(); |
| log_latch1_.CountDown(); |
| } |
| |
| private: |
| CountDownLatch log_latch1_; |
| CountDownLatch test_latch1_; |
| }; |
| |
| namespace { |
| |
| void CompactAsync(Tablet* tablet, CountDownLatch* flush_done_latch) { |
| CHECK_OK(tablet->Compact(Tablet::FORCE_COMPACT_ALL)); |
| flush_done_latch->CountDown(); |
| } |
| |
| } // namespace |
| |
| // Tests that in flight ops are committed and that commit messages |
| // are durable before a compaction is allowed to flush the tablet metadata. |
| // |
| // This test is in preparation for KUDU-120 and should pass before and after |
| // it, but was also confirmed to fail if the pre-conditions it tests for |
| // fail. That is if KUDU-120 is implemented without these pre-requisites |
| // this test is confirmed to fail. |
| TEST_F(TabletServerTest, TestKudu120PreRequisites) { |
| |
| // Insert a few rows... |
| InsertTestRowsRemote(0, 10); |
| // ... now flush ... |
| ASSERT_OK(tablet_replica_->tablet()->Flush()); |
| // ... insert a few rows... |
| InsertTestRowsRemote(10, 10); |
| // ... and flush again so that we have two disk row sets. |
| ASSERT_OK(tablet_replica_->tablet()->Flush()); |
| |
| // Add a hook so that we can make the log wait right after an append |
| // (before the callback is triggered). |
| log::Log* log = tablet_replica_->log(); |
| auto log_hook(make_shared<DelayFsyncLogHook>()); |
| log->SetLogFaultHooksForTests(log_hook); |
| |
| // Now start an op (delete) and stop just before commit. |
| thread delete_thread([this]() { this->DeleteTestRowsRemote(10, 1); }); |
| |
| // Wait for the replicate message to arrive and continue. |
| log_hook->Continue(); |
| // Wait a few msecs to make sure that the op is |
| // trying to commit. |
| usleep(100* 1000); // 100 msecs |
| |
| // Now start a compaction before letting the commit message go through. |
| Tablet* tablet = tablet_replica_->tablet(); |
| CountDownLatch flush_done_latch(1); |
| thread flush_thread([tablet, &flush_done_latch]() { |
| CompactAsync(tablet, &flush_done_latch); |
| }); |
| |
| // At this point we have both a compaction and an op going on. |
| // If we allow the op to return before the commit message is |
| // durable (KUDU-120) that means that the mvcc op will no longer |
| // be in flight at this moment, nonetheless since we're blocking the WAL |
| // and not allowing the commit message to go through, the compaction should |
| // be forced to wait. |
| // |
| // We are thus testing two conditions: |
| // - That in-flight ops are committed. |
| // - That commit messages for ops that were in flight are durable. |
| // |
| // If these pre-conditions are not met, i.e. if the compaction is not forced |
| // to wait here for the conditions to be true, then the below assertion |
| // will fail, since the op's commit write callback will only |
| // return when we allow it (in log_hook->Continue()); |
| CHECK(!flush_done_latch.WaitFor(MonoDelta::FromMilliseconds(300))); |
| |
| // Now let the rest go through. |
| log_hook->Continue(); |
| log_hook->Continue(); |
| flush_done_latch.Wait(); |
| flush_thread.join(); |
| delete_thread.join(); |
| } |
| |
| // Test DNS resolution failure in the master heartbeater. |
| // Regression test for KUDU-1681. |
| TEST_F(TabletServerTest, TestFailedDnsResolution) { |
| FLAGS_fail_dns_resolution = true; |
| mini_server_->server()->heartbeater()->TriggerASAP(); |
| // Wait to make sure the heartbeater thread attempts the DNS lookup. |
| usleep(100 * 1000); |
| } |
| |
| TEST_F(TabletServerTest, TestDataDirGroupsCreated) { |
| // Get the original superblock. |
| TabletSuperBlockPB superblock; |
| tablet_replica_->tablet()->metadata()->ToSuperBlock(&superblock); |
| DataDirGroupPB orig_group = superblock.data_dir_group(); |
| |
| // Remove the DataDirGroupPB on-disk. |
| superblock.clear_data_dir_group(); |
| ASSERT_FALSE(superblock.has_data_dir_group()); |
| string tablet_meta_path = JoinPathSegments(GetTestPath("TabletServerTest-fsroot"), "tablet-meta"); |
| string pb_path = JoinPathSegments(tablet_meta_path, tablet_replica_->tablet_id()); |
| ASSERT_OK(pb_util::WritePBContainerToPath(Env::Default(), |
| pb_path, superblock, pb_util::OVERWRITE, pb_util::SYNC, pb_util::SENSITIVE)); |
| |
| // Verify that the on-disk copy has its DataDirGroup missing. |
| ASSERT_OK(tablet_replica_->tablet()->metadata()->ReadSuperBlockFromDisk(&superblock)); |
| ASSERT_FALSE(superblock.has_data_dir_group()); |
| |
| // Restart the server and check that a new group is created. By default, the |
| // group will be created with all data directories and should be identical to |
| // the original one. |
| ASSERT_OK(ShutdownAndRebuildTablet()); |
| tablet_replica_->tablet()->metadata()->ToSuperBlock(&superblock); |
| DataDirGroupPB new_group = superblock.data_dir_group(); |
| MessageDifferencer md; |
| ASSERT_TRUE(md.Compare(orig_group, new_group)); |
| } |
| |
| TEST_F(TabletServerTest, TestNoMetricsForTombstonedTablet) { |
| // Force the metrics to be retired immediately. |
| FLAGS_metrics_retirement_age_ms = 0; |
| |
| scoped_refptr<TabletReplica> tablet; |
| ASSERT_TRUE(mini_server_->server()->tablet_manager()->LookupTablet(kTabletId, &tablet)); |
| |
| // Insert one row and check the insertion is recorded in the metrics. |
| NO_FATALS(InsertTestRowsRemote(0, 1, 1)); |
| scoped_refptr<Counter> rows_inserted = |
| METRIC_rows_inserted.Instantiate(tablet->tablet()->GetMetricEntity()); |
| int64_t num_rows_running = rows_inserted->value(); |
| ASSERT_EQ(1, num_rows_running); |
| |
| // Tombstone the tablet. |
| DeleteTabletRequestPB req; |
| DeleteTabletResponsePB resp; |
| RpcController rpc; |
| req.set_dest_uuid(mini_server_->server()->fs_manager()->uuid()); |
| req.set_tablet_id(kTabletId); |
| req.set_delete_type(tablet::TABLET_DATA_TOMBSTONED); |
| { |
| SCOPED_TRACE(SecureDebugString(req)); |
| ASSERT_OK(admin_proxy_->DeleteTablet(req, &resp, &rpc)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_FALSE(resp.has_error()); |
| } |
| |
| // It takes three calls to /jsonmetricz for the tablet metrics to go away, based on the |
| // policy in MetricRegistry::RetireOldMetrics: |
| // 1. The entity's metrics are returned, but also marked for retirement. |
| // 2. The entity's metrics are returned, but also retired (causing the entity to be retired). |
| // 3. The metrics aren't returned-- the entity has been removed from the metrics registry. |
| EasyCurl c; |
| faststring buf; |
| for (int i = 0; i < 3; i++) { |
| ASSERT_OK(c.FetchURL(strings::Substitute("http://$0/jsonmetricz", |
| mini_server_->bound_http_addr().ToString()), |
| &buf)); |
| if (i < 2) { |
| ASSERT_STR_CONTAINS(buf.ToString(), "\"type\": \"tablet\""); |
| } else { |
| ASSERT_STR_NOT_CONTAINS(buf.ToString(), "\"type\": \"tablet\""); |
| } |
| } |
| } |
| |
| TEST_F(TabletServerTest, TestTabletNumberOfDiskRowSetsMetric) { |
| scoped_refptr<TabletReplica> tablet; |
| ASSERT_TRUE(mini_server_->server()->tablet_manager()->LookupTablet(kTabletId, &tablet)); |
| ASSERT_TRUE(tablet->tablet()->GetMetricEntity()); |
| |
| // We don't care what the function is, since the metric is already instantiated. |
| auto num_diskrowsets = METRIC_num_rowsets_on_disk.InstantiateFunctionGauge( |
| tablet->tablet()->GetMetricEntity(), []() { return 0; }); |
| |
| // No data, no diskrowsets. |
| ASSERT_EQ(0, num_diskrowsets->value()); |
| |
| // Insert a row and flush. There should be 1 diskrowset. |
| NO_FATALS(InsertTestRowsRemote(0, 1, 1)); |
| ASSERT_OK(tablet->tablet()->Flush()); |
| ASSERT_EQ(1, num_diskrowsets->value()); |
| } |
| |
| // Test ensuring that when rowset min/max keys are stored with and read from |
| // the rowset metadata, the tablet server doesn't read any blocks when |
| // bootstrapping. |
| TEST_F(TabletServerTest, TestKeysInRowsetMetadataPreventStartupSeeks) { |
| // Write the min/max keys to the rowset metadata. This gives us the option to |
| // read from the CFile vs from the rowset metadata. |
| FLAGS_rowset_metadata_store_keys = true; |
| InsertTestRowsDirect(0, 100); |
| ASSERT_OK(tablet_replica_->tablet()->Flush()); |
| // Disable the maintenance manager so we don't get any seeks from |
| // maintenance operations when we restart. |
| FLAGS_enable_maintenance_manager = false; |
| |
| const auto restart_server_and_check_bytes_read = [&] (bool keys_in_rowset_meta) { |
| FLAGS_rowset_metadata_store_keys = keys_in_rowset_meta; |
| // Reset the replica to avoid any lingering references. |
| // Restart the server and wait for the tablet to bootstrap. |
| tablet_replica_.reset(); |
| mini_server_->Shutdown(); |
| |
| ASSERT_OK(mini_server_->Restart()); |
| ASSERT_OK(mini_server_->WaitStarted()); |
| |
| scoped_refptr<Counter> bytes_read_metric = |
| METRIC_block_manager_total_bytes_read.Instantiate( |
| mini_server_->server()->metric_entity()); |
| int64_t bm_bytes_read = bytes_read_metric->value(); |
| if (keys_in_rowset_meta) { |
| ASSERT_EQ(0, bm_bytes_read); |
| } else { |
| ASSERT_LT(0, bm_bytes_read); |
| } |
| }; |
| |
| // Test both reading and not reading the keys from the rowset metadata, |
| // making sure we read bytes in the block manager only when expected (no |
| // bytes should be read by the BM if storing keys in the rowset metadata). |
| restart_server_and_check_bytes_read(/*keys_in_rowset_meta=*/ false); |
| restart_server_and_check_bytes_read(/*keys_in_rowset_meta=*/ true); |
| } |
| |
| // Test that each scanner can only be accessed by the user who created it. |
| TEST_F(TabletServerTest, TestScannerCheckMatchingUser) { |
| rpc::UserCredentials user; |
| user.set_real_user("good-guy"); |
| proxy_->set_user_credentials(user); |
| |
| InsertTestRowsDirect(0, 100); |
| ScanResponsePB resp; |
| NO_FATALS(OpenScannerWithAllColumns(&resp)); |
| const string& scanner_id = resp.scanner_id(); |
| ASSERT_TRUE(!scanner_id.empty()); |
| |
| // Now do a checksum scan as the user. |
| string checksum_scanner_id; |
| uint64_t checksum_val; |
| { |
| ChecksumRequestPB checksum_req; |
| ChecksumResponsePB checksum_resp; |
| RpcController rpc; |
| ASSERT_OK(FillNewScanRequest(READ_LATEST, checksum_req.mutable_new_request())); |
| // Set a batch size of 0 so we don't return rows and can expect the scanner |
| // to remain alive. |
| checksum_req.set_batch_size_bytes(0); |
| ASSERT_OK(proxy_->Checksum(checksum_req, &checksum_resp, &rpc)); |
| SCOPED_TRACE(checksum_resp.DebugString()); |
| ASSERT_FALSE(checksum_resp.has_error()); |
| ASSERT_TRUE(checksum_resp.has_more_results()); |
| checksum_scanner_id = checksum_resp.scanner_id(); |
| checksum_val = checksum_resp.checksum(); |
| } |
| |
| const auto verify_authz_error = [] (const Status& s) { |
| EXPECT_TRUE(s.IsRemoteError()) << s.ToString(); |
| ASSERT_STR_CONTAINS(s.ToString(), "Not authorized"); |
| }; |
| |
| for (const string& other : { "", "bad-guy" }) { |
| TabletServerServiceProxy bad_proxy( |
| client_messenger_, mini_server_->bound_rpc_addr(), |
| mini_server_->bound_rpc_addr().host()); |
| if (!other.empty()) { |
| rpc::UserCredentials other_user; |
| other_user.set_real_user(other); |
| bad_proxy.set_user_credentials(other_user); |
| } |
| // Other users and clients with no credentials will be bounced for scans, |
| // checksum scans, and keep-alive requests. |
| { |
| ScanRequestPB req; |
| RpcController rpc; |
| req.set_scanner_id(scanner_id); |
| Status s = bad_proxy.Scan(req, &resp, &rpc); |
| SCOPED_TRACE(resp.DebugString()); |
| NO_FATALS(verify_authz_error(s)); |
| } |
| { |
| ChecksumRequestPB req; |
| ContinueChecksumRequestPB* continue_req = req.mutable_continue_request(); |
| continue_req->set_scanner_id(checksum_scanner_id); |
| continue_req->set_previous_checksum(checksum_val); |
| ChecksumResponsePB resp; |
| RpcController rpc; |
| Status s = bad_proxy.Checksum(req, &resp, &rpc); |
| SCOPED_TRACE(resp.DebugString()); |
| NO_FATALS(verify_authz_error(s)); |
| } |
| for (const string& id : { scanner_id, checksum_scanner_id }) { |
| ScannerKeepAliveRequestPB req; |
| req.set_scanner_id(id); |
| ScannerKeepAliveResponsePB resp; |
| RpcController rpc; |
| Status s = bad_proxy.ScannerKeepAlive(req, &resp, &rpc); |
| SCOPED_TRACE(resp.DebugString()); |
| NO_FATALS(verify_authz_error(s)); |
| } |
| } |
| } |
| |
| // Regression test for KUDU-3195 that ensures that as long as there are DMSs |
| // older than the flush threshold, we will schedule DMS flushes. |
| TEST_F(TabletServerTest, TestTimeBasedFlushDMS) { |
| SKIP_IF_SLOW_NOT_ALLOWED(); |
| FLAGS_enable_maintenance_manager = false; |
| NO_FATALS(ShutdownAndRebuildTablet()); |
| // We're going to generate a bunch of DMSs, and for that, we need multiple |
| // rowsets, so disable merge compactions. |
| FLAGS_enable_rowset_compaction = false; |
| |
| constexpr const int kNumRowsets = 100; |
| constexpr const int kRowsPerRowset = 1; |
| for (int i = 0; i < kNumRowsets; i++) { |
| NO_FATALS(InsertTestRowsDirect(i * kRowsPerRowset, kRowsPerRowset)); |
| ASSERT_OK(tablet_replica_->tablet()->Flush()); |
| } |
| for (int i = 0; i < kNumRowsets * kRowsPerRowset; i++) { |
| NO_FATALS(UpdateTestRowRemote(i, 0)); |
| } |
| ASSERT_FALSE(tablet_replica_->tablet()->DeltaMemRowSetEmpty()); |
| |
| // We want to see how quickly we can flush DMSs, so speed up time-based |
| // flushing. |
| FLAGS_enable_maintenance_manager = true; |
| FLAGS_maintenance_manager_polling_interval_ms = 1; |
| FLAGS_flush_threshold_secs = 1; |
| NO_FATALS(ShutdownAndRebuildTablet()); |
| // We should be able to wait for the flush threshold and very quickly see all |
| // DMSs get flushed. We'll wait 5x the flush threshold, which should be ample |
| // time given how little we're writing. |
| // NOTE: There are 100 DMSs to flush -- we shouldn't be doing anything silly |
| // like waiting a full flush threshold between flushes, which would take 100 |
| // seconds! |
| SleepFor(MonoDelta::FromSeconds(5 * FLAGS_flush_threshold_secs)); |
| ASSERT_TRUE(tablet_replica_->tablet()->DeltaMemRowSetEmpty()); |
| } |
| |
| TEST_F(TabletServerTest, StartupWithConcurrentOpsBenchmark) { |
| SKIP_IF_SLOW_NOT_ALLOWED(); |
| FLAGS_enable_maintenance_manager = false; |
| NO_FATALS(ShutdownAndRebuildTablet()); |
| |
| // Create a bunch of tablets with a bunch of rowsets. |
| ObjectIdGenerator oid; |
| for (int i = 0; i < FLAGS_num_tablets; i++) { |
| const auto tablet_id = oid.Next(); |
| ASSERT_OK(mini_server_->AddTestTablet(kTableId, tablet_id, schema_)); |
| ASSERT_OK(WaitForTabletRunning(tablet_id.c_str())); |
| for (int r = 0; r < FLAGS_num_rowsets_per_tablet; r++) { |
| NO_FATALS(InsertTestRowsDirect(r, 1, tablet_id)); |
| scoped_refptr<TabletReplica> replica; |
| ASSERT_TRUE(mini_server_->server()->tablet_manager()->LookupTablet(tablet_id, &replica)); |
| ASSERT_OK(replica->tablet()->Flush()); |
| } |
| } |
| ShutdownTablet(); |
| // With a large number of tablets and rowsets, the maintenance manager |
| // scheduler thread will spend a lot of time computing the best op to |
| // perform. With many maintenance threads, finding the best op may contend |
| // with on-going compactions. Per KUDU-3149, this shouldn't interfere with op |
| // registration and bootstrapping. |
| FLAGS_enable_maintenance_manager = true; |
| FLAGS_maintenance_manager_polling_interval_ms = 1; |
| FLAGS_maintenance_manager_num_threads = 10; |
| mini_server_.reset(new MiniTabletServer(GetTestPath("TabletServerTest-fsroot"), |
| HostPort("127.0.0.1", 0), 1)); |
| ASSERT_OK(mini_server_->Start()); |
| LOG_TIMING(INFO, "waiting for all bootstraps to finish") { |
| ASSERT_OK(mini_server_->server()->tablet_manager()->WaitForAllBootstrapsToFinish()); |
| } |
| } |
| |
| TEST_F(TabletServerTest, TestStarvePerfImprovementOpsInColdTablet) { |
| SKIP_IF_SLOW_NOT_ALLOWED(); |
| FLAGS_enable_maintenance_manager = true; |
| NO_FATALS(ShutdownAndRebuildTablet()); |
| |
| // Disable flushing and compactions to create overlapping rowsets. |
| FLAGS_enable_flush_memrowset = false; |
| FLAGS_enable_rowset_compaction = false; |
| |
| // Create a cold tablet and insert sets of overlapping rows to it. |
| const char* kColdTablet = "cold_tablet"; |
| ASSERT_OK(mini_server_->AddTestTablet(kTableId, kColdTablet, schema_)); |
| ASSERT_OK(WaitForTabletRunning(kColdTablet)); |
| scoped_refptr<TabletReplica> cold_replica; |
| ASSERT_TRUE(mini_server_->server()->tablet_manager()->LookupTablet(kColdTablet, &cold_replica)); |
| NO_FATALS(InsertTestRowsDirect(0, 1, kColdTablet)); |
| NO_FATALS(InsertTestRowsDirect(2, 1, kColdTablet)); |
| ASSERT_OK(cold_replica->tablet()->Flush()); |
| NO_FATALS(InsertTestRowsDirect(1, 1, kColdTablet)); |
| ASSERT_OK(cold_replica->tablet()->Flush()); |
| |
| // Make MRS flush op starts out with the highest adjusted perf score. |
| FLAGS_enable_flush_memrowset = true; |
| FLAGS_enable_workload_score_for_perf_improvement_ops = true; |
| FLAGS_workload_stats_rate_collection_min_interval_ms = 10; |
| FLAGS_workload_score_upper_bound = 10; |
| // Make maintenance manager find best op more slowly than a new MRS could flush, |
| // so there are always something to do in the hot tablet. |
| FLAGS_flush_threshold_secs = 1; |
| FLAGS_maintenance_manager_inject_latency_ms = 1000; |
| // Make the original perf improvement score of a flush op smaller than that of a compaction op. |
| FLAGS_flush_threshold_mb = 10 * 1024; |
| FLAGS_flush_upper_bound_ms = 10 * 60 * 60 * 1000; |
| |
| // Make the default tablet 'hot'. |
| std::atomic<bool> keep_inserting_and_scanning(true); |
| thread insert_thread([&] { |
| int cur_row = 0; |
| while (keep_inserting_and_scanning) { |
| NO_FATALS(InsertTestRowsDirect(cur_row++, 1)); |
| ScanResponsePB resp; |
| NO_FATALS(OpenScannerWithAllColumns(&resp)); |
| ASSERT_TRUE(!resp.scanner_id().empty()); |
| } |
| }); |
| SCOPED_CLEANUP({ |
| keep_inserting_and_scanning = false; |
| insert_thread.join(); |
| }); |
| |
| // Wait a bit to allow workload stats collection to begin. |
| SleepFor(MonoDelta::FromSeconds(1)); |
| FLAGS_enable_rowset_compaction = true; |
| // Wait a couple seconds to test if we can starve compaction in the cold tablet even |
| // if the compaction op has a higher raw perf improvement score than that of time-based |
| // flushes in the hot tablet. |
| SleepFor(MonoDelta::FromSeconds(5)); |
| ASSERT_EQ(0, cold_replica->tablet()->metrics()->compact_rs_duration->TotalCount()); |
| |
| // Disable workload score, now we can launch compaction in the cold tablet. |
| FLAGS_enable_workload_score_for_perf_improvement_ops = false; |
| // Compaction in the cold tablet will eventually complete. |
| ASSERT_EVENTUALLY([&] { |
| ASSERT_EQ(1, cold_replica->tablet()->metrics()->compact_rs_duration->TotalCount()); |
| }); |
| } |
| |
| class OpApplyQueueTest : public TabletServerTestBase { |
| public: |
| // Starts the tablet server, override to start it later. |
| void SetUp() override { |
| |
| // Since scenarios of this test make bursts of requests, set the maximum |
| // length of the service queue to accomodate many requests. |
| FLAGS_rpc_service_queue_length = 1000; |
| |
| // Set the overload threshold for the op apply queue. Since the threshold |
| // is two-fold of the injected latency and number of operations submitted |
| // at once much higher than the number of worker threads in the apply thread |
| // pool, the queue will gradually accumulate operations until becoming |
| // overloaded. |
| FLAGS_tablet_apply_pool_overload_threshold_ms = kInjectedLatencyMs; |
| |
| NO_FATALS(TabletServerTestBase::SetUp()); |
| NO_FATALS(StartTabletServer(/*num_data_dirs=*/1)); |
| } |
| |
| static constexpr const int32_t kInjectedLatencyMs = 100; |
| }; |
| |
| // This is a regression test for KUDU-1587. |
| TEST_F(OpApplyQueueTest, ApplyQueueBackpressure) { |
| SKIP_IF_SLOW_NOT_ALLOWED(); |
| |
| constexpr size_t kNumCalls = 1000; |
| const int num_cpus = base::NumCPUs(); |
| WriteRequestPB req; |
| req.set_tablet_id(kTabletId); |
| ASSERT_OK(SchemaToPB(schema_, req.mutable_schema())); |
| |
| // Inject latency into WriteOp::Apply(). |
| FLAGS_tablet_inject_latency_on_apply_write_op_ms = kInjectedLatencyMs; |
| |
| // Send many calls to tablet server, not awaiting for responses. |
| // After sending the first chunk, wait for some time to allow for propagating |
| // the operations to the apply queue and to have apply queue times growing. |
| vector<unique_ptr<RpcController>> controllers; |
| vector<unique_ptr<WriteResponsePB>> responses; |
| CountDownLatch latch(kNumCalls); |
| for (size_t idx = 0; idx < kNumCalls; ++idx) { |
| controllers.emplace_back(new RpcController); |
| responses.emplace_back(new WriteResponsePB); |
| |
| req.clear_row_operations(); |
| auto* data = req.mutable_row_operations(); |
| AddTestRowWithNullableStringToPB( |
| RowOperationsPB::INSERT, schema_, idx, idx, nullptr, data); |
| proxy_->AsyncRequest("Write", req, responses.back().get(), controllers.back().get(), |
| [&latch]() { latch.CountDown(); }); |
| if (idx == 10 * num_cpus) { |
| // Allow to realize what current queue times are once ops reached |
| // the apply queue. |
| SleepFor(MonoDelta::FromMilliseconds(2 * kInjectedLatencyMs)); |
| } |
| } |
| |
| // Wait for calls to be processed before capturing the apply queue stats. |
| latch.Wait(); |
| |
| size_t num_ok = 0; |
| size_t num_error = 0; |
| for (const auto& ctl : controllers) { |
| if (ctl->status().ok()) { |
| ++num_ok; |
| } else { |
| ++num_error; |
| } |
| } |
| ASSERT_EQ(kNumCalls, num_ok + num_error); |
| |
| // Not all request should succeed -- due to long apply times, some should be |
| // rejected. |
| EXPECT_GT(num_error, 0); |
| |
| { |
| // No RPC queue overflows are expected. |
| auto rpc_queue_overflows = METRIC_rpcs_queue_overflow.Instantiate( |
| mini_server_->server()->metric_entity()); |
| ASSERT_EQ(0, rpc_queue_overflows->value()); |
| } |
| |
| { |
| // No requests should timeout while in the queue. |
| auto timed_out_in_rpc_queue = METRIC_rpcs_timed_out_in_queue.Instantiate( |
| mini_server_->server()->metric_entity()); |
| ASSERT_EQ(0, timed_out_in_rpc_queue->value()); |
| } |
| |
| { |
| // Some calls should be rejected due to overloaded apply queue, |
| // so the corresponding queue times should not get too close to the value of |
| // total_request * injected_latency / number_of_apply_threads. |
| auto qt = METRIC_op_apply_queue_time.Instantiate( |
| mini_server_->server()->metric_entity()); |
| ostringstream ostr; |
| ostr << qt->prototype()->name() << ":" << endl; |
| const auto* h = qt->histogram(); |
| h->DumpHumanReadable(&ostr); |
| LOG(INFO) << ostr.str(); |
| |
| // These are simple heuristics rather than exact theoretical thresholds. |
| // They depend on the injected latency and the 'overloaded' threshold |
| // for the apply queue. |
| EXPECT_LT(h->MaxValue(), |
| kInjectedLatencyMs * 1000 * kNumCalls * 4 / (4 * num_cpus)); |
| EXPECT_LT(h->ValueAtPercentile(99), |
| kInjectedLatencyMs * 1000 * kNumCalls * 3 / (4 * num_cpus)); |
| EXPECT_LT(h->ValueAtPercentile(75), |
| kInjectedLatencyMs * 1000 * kNumCalls * 2 / (4 * num_cpus)); |
| } |
| |
| { |
| // With current apply latency, the queue should not get too long. |
| auto ql = METRIC_op_apply_queue_length.Instantiate( |
| mini_server_->server()->metric_entity()); |
| ostringstream ostr; |
| ostr << ql->prototype()->name() << ":" << endl; |
| const auto* h = ql->histogram(); |
| h->DumpHumanReadable(&ostr); |
| LOG(INFO) << ostr.str(); |
| |
| // These are simple heuristics as well. They depend on the injected latency |
| // and the 'overloaded' threshold for the apply queue. |
| EXPECT_LT(h->MaxValue(), 3 * kNumCalls / 4); |
| EXPECT_LT(h->MeanValue(), kNumCalls / 2); |
| } |
| } |
| |
| } // namespace tserver |
| } // namespace kudu |