blob: 4d6829d7a9dae05d45fbe9d129eba13ba20e9301 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <algorithm>
#include <atomic>
#include <cstddef>
#include <cstdint>
#include <functional>
#include <initializer_list>
#include <memory>
#include <numeric>
#include <optional>
#include <ostream>
#include <string>
#include <thread>
#include <utility>
#include <vector>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <google/protobuf/arena.h>
#include <gtest/gtest.h>
#include "kudu/client/client.h"
#include "kudu/client/schema.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/partial_row.h"
#include "kudu/common/partition.h"
#include "kudu/common/row_operations.h"
#include "kudu/common/row_operations.pb.h"
#include "kudu/common/schema.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/integration-tests/cluster_itest_util.h"
#include "kudu/integration-tests/test_workload.h"
#include "kudu/master/catalog_manager.h"
#include "kudu/master/master.h"
#include "kudu/master/master.pb.h"
#include "kudu/master/master.proxy.h"
#include "kudu/master/mini_master.h"
#include "kudu/mini-cluster/external_mini_cluster.h"
#include "kudu/mini-cluster/internal_mini_cluster.h"
#include "kudu/rpc/messenger.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/tserver/mini_tablet_server.h"
#include "kudu/util/hdr_histogram.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/path_util.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
using kudu::client::KuduClient;
using kudu::client::KuduSchema;
using kudu::client::KuduTableAlterer;
using kudu::client::KuduTableCreator;
using kudu::cluster::ExternalMiniCluster;
using kudu::cluster::ExternalMiniClusterOptions;
using kudu::cluster::InternalMiniCluster;
using kudu::cluster::InternalMiniClusterOptions;
using kudu::master::ReplicaTypeFilter;
using kudu::pb_util::SecureDebugString;
using kudu::rpc::Messenger;
using kudu::rpc::MessengerBuilder;
using kudu::rpc::RpcController;
using std::ostringstream;
using std::pair;
using std::shared_ptr;
using std::string;
using std::thread;
using std::unique_ptr;
using std::vector;
using strings::Substitute;
DEFINE_int32(benchmark_runtime_secs, 5, "Number of seconds to run the benchmark");
DEFINE_int32(benchmark_num_threads, 16, "Number of threads to run the benchmark");
DEFINE_int32(benchmark_num_tablets, 60, "Number of tablets to create");
DECLARE_double(leader_failure_max_missed_heartbeat_periods);
DECLARE_int32(follower_unavailable_considered_failed_sec);
DECLARE_int32(heartbeat_interval_ms);
DECLARE_int32(max_create_tablets_per_ts);
DECLARE_int32(raft_heartbeat_interval_ms);
DECLARE_int32(rpc_num_service_threads);
DECLARE_int32(rpc_service_queue_length);
DECLARE_int32(table_locations_ttl_ms);
DECLARE_string(location_mapping_cmd);
DECLARE_uint32(table_locations_cache_capacity_mb);
METRIC_DECLARE_entity(server);
METRIC_DECLARE_counter(rpcs_queue_overflow);
METRIC_DECLARE_counter(table_locations_cache_evictions);
METRIC_DECLARE_counter(table_locations_cache_hits);
METRIC_DECLARE_counter(table_locations_cache_inserts);
METRIC_DECLARE_counter(table_locations_cache_lookups);
METRIC_DECLARE_counter(table_locations_cache_misses);
METRIC_DECLARE_gauge_uint64(table_locations_cache_memory_usage);
METRIC_DECLARE_histogram(handler_latency_kudu_master_MasterService_GetTableLocations);
namespace kudu {
namespace master {
// Test the master GetTableLocations RPC. This can't be done in master-test,
// since it requires a running tablet server in order for the catalog manager to
// report tablet locations.
class TableLocationsTest : public KuduTest {
public:
void SetUp() override {
KuduTest::SetUp();
FLAGS_max_create_tablets_per_ts = 1000;
SetUpConfig();
InternalMiniClusterOptions opts;
opts.num_tablet_servers = 3;
cluster_.reset(new InternalMiniCluster(env_, opts));
ASSERT_OK(cluster_->Start());
// Create a client proxy to the master.
MessengerBuilder bld("Client");
ASSERT_OK(bld.Build(&client_messenger_));
const auto& addr = cluster_->mini_master()->bound_rpc_addr();
proxy_.reset(new MasterServiceProxy(client_messenger_, addr, addr.host()));
}
void TearDown() override {
if (cluster_) {
cluster_->Shutdown();
cluster_.reset();
}
KuduTest::TearDown();
}
virtual void SetUpConfig() {}
struct HashDimension {
vector<string> columns;
int32_t num_buckets;
uint32_t seed;
};
typedef vector<HashDimension> HashSchema;
struct RangeWithHashSchema {
KuduPartialRow lower;
KuduPartialRow upper;
HashSchema hash_schema;
};
Status CreateTable(
const string& table_name,
const Schema& schema,
const vector<KuduPartialRow>& split_rows = {},
const vector<pair<KuduPartialRow, KuduPartialRow>>& bounds = {},
const vector<RangeWithHashSchema>& ranges_with_hash_schemas = {},
const HashSchema& table_hash_schema = {});
void CreateTable(const string& table_name, int num_splits);
// Check that the master doesn't give back partial results while the table is being created.
void CheckMasterTableCreation(const string &table_name, int tablet_locations_size);
shared_ptr<Messenger> client_messenger_;
unique_ptr<InternalMiniCluster> cluster_;
unique_ptr<MasterServiceProxy> proxy_;
};
Status TableLocationsTest::CreateTable(
const string& table_name,
const Schema& schema,
const vector<KuduPartialRow>& split_rows,
const vector<pair<KuduPartialRow, KuduPartialRow>>& bounds,
const vector<RangeWithHashSchema>& ranges_with_hash_schemas,
const HashSchema& table_hash_schema) {
CreateTableRequestPB req;
req.set_name(table_name);
RETURN_NOT_OK(SchemaToPB(schema, req.mutable_schema()));
RowOperationsPBEncoder splits_encoder(req.mutable_split_rows_range_bounds());
for (const KuduPartialRow& row : split_rows) {
splits_encoder.Add(RowOperationsPB::SPLIT_ROW, row);
}
for (const auto& bound : bounds) {
splits_encoder.Add(RowOperationsPB::RANGE_LOWER_BOUND, bound.first);
splits_encoder.Add(RowOperationsPB::RANGE_UPPER_BOUND, bound.second);
}
auto* ps_pb = req.mutable_partition_schema();
for (const auto& range_and_hash_schema : ranges_with_hash_schemas) {
auto* range = ps_pb->add_custom_hash_schema_ranges();
RowOperationsPBEncoder encoder(range->mutable_range_bounds());
encoder.Add(RowOperationsPB::RANGE_LOWER_BOUND, range_and_hash_schema.lower);
encoder.Add(RowOperationsPB::RANGE_UPPER_BOUND, range_and_hash_schema.upper);
for (const auto& hash_dimension : range_and_hash_schema.hash_schema) {
auto* hash_dimension_pb = range->add_hash_schema();
for (const string& col_name : hash_dimension.columns) {
hash_dimension_pb->add_columns()->set_name(col_name);
}
hash_dimension_pb->set_num_buckets(hash_dimension.num_buckets);
hash_dimension_pb->set_seed(hash_dimension.seed);
}
}
if (!table_hash_schema.empty()) {
for (const auto& hash_dimension : table_hash_schema) {
auto* hash_schema_pb = ps_pb->add_hash_schema();
for (const string& col_name : hash_dimension.columns) {
hash_schema_pb->add_columns()->set_name(col_name);
}
hash_schema_pb->set_num_buckets(hash_dimension.num_buckets);
hash_schema_pb->set_seed(hash_dimension.seed);
}
}
CreateTableResponsePB resp;
RpcController controller;
RETURN_NOT_OK(proxy_->CreateTable(req, &resp, &controller));
if (resp.has_error()) {
RETURN_NOT_OK(StatusFromPB(resp.error().status()));
}
return Status::OK();
}
void TableLocationsTest::CreateTable(const string& table_name, int num_splits) {
Schema schema({ ColumnSchema("key", INT32) }, 1);
KuduPartialRow row(&schema);
vector<KuduPartialRow> splits(num_splits, row);
for (int i = 0; i < num_splits; i++) {
ASSERT_OK(splits[i].SetInt32(0, i*1000));
}
ASSERT_OK(CreateTable(table_name, schema, splits));
NO_FATALS(CheckMasterTableCreation(table_name, num_splits + 1));
}
void TableLocationsTest::CheckMasterTableCreation(const string &table_name,
int tablet_locations_size) {
GetTableLocationsRequestPB req;
GetTableLocationsResponsePB resp;
RpcController controller;
req.set_max_returned_locations(1000);
req.mutable_table()->set_table_name(table_name);
for (int i = 1; ; i++) {
if (i > 10) {
FAIL() << "Create table timed out";
}
controller.Reset();
ASSERT_OK(proxy_->GetTableLocations(req, &resp, &controller));
SCOPED_TRACE(SecureDebugString(resp));
if (resp.has_error()) {
ASSERT_EQ(MasterErrorPB::TABLET_NOT_RUNNING, resp.error().code());
SleepFor(MonoDelta::FromMilliseconds(i * i * 100));
} else {
ASSERT_EQ(tablet_locations_size, resp.tablet_locations().size());
break;
}
}
}
// Test the tablet server location is properly set in the master GetTableLocations RPC.
class TableLocationsWithTSLocationTest : public TableLocationsTest {
public:
void SetUpConfig() override {
const string location_cmd_path = JoinPathSegments(GetTestExecutableDirectory(),
"testdata/first_argument.sh");
const string location = "/foo";
FLAGS_location_mapping_cmd = Substitute("$0 $1", location_cmd_path, location);
}
};
// Test that when the client requests table locations for a non-covered
// partition range, the master returns the first tablet previous to the begin
// partition key, as specified in the non-covering range partitions design
// document.
TEST_F(TableLocationsTest, TestGetTableLocations) {
const string table_name = "test";
Schema schema({ ColumnSchema("key", STRING) }, 1);
KuduPartialRow row(&schema);
vector<KuduPartialRow> splits(6, row);
ASSERT_OK(splits[0].SetStringNoCopy(0, "aa"));
ASSERT_OK(splits[1].SetStringNoCopy(0, "ab"));
ASSERT_OK(splits[2].SetStringNoCopy(0, "ac"));
ASSERT_OK(splits[3].SetStringNoCopy(0, "ca"));
ASSERT_OK(splits[4].SetStringNoCopy(0, "cb"));
ASSERT_OK(splits[5].SetStringNoCopy(0, "cc"));
vector<pair<KuduPartialRow, KuduPartialRow>> bounds(2, { row, row });
ASSERT_OK(bounds[0].first.SetStringNoCopy(0, "a"));
ASSERT_OK(bounds[0].second.SetStringNoCopy(0, "b"));
ASSERT_OK(bounds[1].first.SetStringNoCopy(0, "c"));
ASSERT_OK(bounds[1].second.SetStringNoCopy(0, "d"));
ASSERT_OK(CreateTable(table_name, schema, splits, bounds));
NO_FATALS(CheckMasterTableCreation(table_name, 8));
{ // from "a"
GetTableLocationsRequestPB req;
GetTableLocationsResponsePB resp;
RpcController controller;
req.mutable_table()->set_table_name(table_name);
req.set_partition_key_start("a");
req.set_max_returned_locations(3);
ASSERT_OK(proxy_->GetTableLocations(req, &resp, &controller));
SCOPED_TRACE(SecureDebugString(resp));
ASSERT_TRUE(!resp.has_error());
ASSERT_EQ(3, resp.tablet_locations().size());
EXPECT_EQ("a", resp.tablet_locations(0).partition().partition_key_start());
EXPECT_EQ("aa", resp.tablet_locations(1).partition().partition_key_start());
EXPECT_EQ("ab", resp.tablet_locations(2).partition().partition_key_start());
}
{ // from ""
GetTableLocationsRequestPB req;
GetTableLocationsResponsePB resp;
RpcController controller;
req.mutable_table()->set_table_name(table_name);
req.set_partition_key_start("");
req.set_max_returned_locations(3);
ASSERT_OK(proxy_->GetTableLocations(req, &resp, &controller));
SCOPED_TRACE(SecureDebugString(resp));
ASSERT_TRUE(!resp.has_error());
ASSERT_EQ(3, resp.tablet_locations().size());
EXPECT_EQ("a", resp.tablet_locations(0).partition().partition_key_start());
EXPECT_EQ("aa", resp.tablet_locations(1).partition().partition_key_start());
EXPECT_EQ("ab", resp.tablet_locations(2).partition().partition_key_start());
auto get_tablet_location = [](MasterServiceProxy* proxy, const string& tablet_id) {
rpc::RpcController rpc_old;
GetTabletLocationsRequestPB req_old;
GetTabletLocationsResponsePB resp_old;
*req_old.add_tablet_ids() = tablet_id;
ASSERT_OK(proxy->GetTabletLocations(req_old, &resp_old, &rpc_old));
const auto& loc_old = resp_old.tablet_locations(0);
ASSERT_GT(loc_old.deprecated_replicas_size(), 0);
ASSERT_EQ(0, loc_old.interned_replicas_size());
rpc::RpcController rpc_new;
GetTabletLocationsRequestPB req_new;
GetTabletLocationsResponsePB resp_new;
*req_new.add_tablet_ids() = tablet_id;
req_new.set_intern_ts_infos_in_response(true);
ASSERT_OK(proxy->GetTabletLocations(req_new, &resp_new, &rpc_new));
const auto& loc_new = resp_new.tablet_locations(0);
ASSERT_GT(loc_new.interned_replicas_size(), 0);
ASSERT_EQ(0, loc_new.deprecated_replicas_size());
ASSERT_EQ(loc_old.tablet_id(), loc_new.tablet_id());
};
// Check that a UUID was returned for every replica
for (const auto& loc : resp.tablet_locations()) {
NO_FATALS(get_tablet_location(proxy_.get(), loc.tablet_id()));
ASSERT_GT(loc.deprecated_replicas_size(), 0);
ASSERT_EQ(0, loc.interned_replicas_size());
for (const auto& replica : loc.deprecated_replicas()) {
ASSERT_NE("", replica.ts_info().permanent_uuid());
}
}
}
{ // from "", with TSInfo interning enabled.
GetTableLocationsRequestPB req;
GetTableLocationsResponsePB resp;
RpcController controller;
req.mutable_table()->set_table_name(table_name);
req.set_partition_key_start("");
req.set_max_returned_locations(3);
req.set_intern_ts_infos_in_response(true);
ASSERT_OK(proxy_->GetTableLocations(req, &resp, &controller));
SCOPED_TRACE(SecureDebugString(resp));
ASSERT_TRUE(!resp.has_error());
ASSERT_EQ(3, resp.tablet_locations().size());
EXPECT_EQ("a", resp.tablet_locations(0).partition().partition_key_start());
EXPECT_EQ("aa", resp.tablet_locations(1).partition().partition_key_start());
EXPECT_EQ("ab", resp.tablet_locations(2).partition().partition_key_start());
// Check that a UUID was returned for every replica
for (const auto& loc : resp.tablet_locations()) {
ASSERT_EQ(loc.deprecated_replicas_size(), 0);
ASSERT_GT(loc.interned_replicas_size(), 0);
for (const auto& replica : loc.interned_replicas()) {
int idx = replica.ts_info_idx();
ASSERT_GE(idx, 0);
ASSERT_LE(idx, resp.ts_infos_size());
ASSERT_NE("", resp.ts_infos(idx).permanent_uuid());
}
}
}
{ // from "b"
GetTableLocationsRequestPB req;
GetTableLocationsResponsePB resp;
RpcController controller;
req.mutable_table()->set_table_name(table_name);
req.set_partition_key_start("b");
req.set_max_returned_locations(3);
ASSERT_OK(proxy_->GetTableLocations(req, &resp, &controller));
SCOPED_TRACE(SecureDebugString(resp));
ASSERT_TRUE(!resp.has_error());
ASSERT_EQ(3, resp.tablet_locations().size());
EXPECT_EQ("ac", resp.tablet_locations(0).partition().partition_key_start());
EXPECT_EQ("c", resp.tablet_locations(1).partition().partition_key_start());
EXPECT_EQ("ca", resp.tablet_locations(2).partition().partition_key_start());
}
{ // from "z"
GetTableLocationsRequestPB req;
GetTableLocationsResponsePB resp;
RpcController controller;
req.mutable_table()->set_table_name(table_name);
req.set_partition_key_start("z");
req.set_max_returned_locations(3);
ASSERT_OK(proxy_->GetTableLocations(req, &resp, &controller));
SCOPED_TRACE(SecureDebugString(resp));
ASSERT_TRUE(!resp.has_error());
ASSERT_EQ(1, resp.tablet_locations().size());
EXPECT_EQ("cc", resp.tablet_locations(0).partition().partition_key_start());
}
{ // Check that the tablet server location should be an empty string if not set.
GetTableLocationsRequestPB req;
GetTableLocationsResponsePB resp;
RpcController controller;
req.mutable_table()->set_table_name(table_name);
req.set_max_returned_locations(1);
req.set_intern_ts_infos_in_response(true);
ASSERT_OK(proxy_->GetTableLocations(req, &resp, &controller));
SCOPED_TRACE(SecureDebugString(resp));
ASSERT_TRUE(!resp.has_error());
ASSERT_EQ(1, resp.tablet_locations().size());
ASSERT_EQ(3, resp.tablet_locations(0).interned_replicas_size());
const auto& loc = resp.tablet_locations(0);
for (const auto& replica : loc.interned_replicas()) {
EXPECT_EQ("", resp.ts_infos(replica.ts_info_idx()).location());
}
}
}
TEST_F(TableLocationsTest, RangeSpecificHashingSameDimensions) {
const string table_name = CURRENT_TEST_NAME();
Schema schema({
ColumnSchema("str_0", STRING),
ColumnSchema("int_1", INT32),
ColumnSchema("str_2", STRING),
}, 3);
KuduPartialRow row(&schema);
// Use back-to-back and sparse range placement.
vector<pair<KuduPartialRow, KuduPartialRow>> bounds(5, { row, row });
ASSERT_OK(bounds[0].first.SetStringNoCopy(0, "a"));
ASSERT_OK(bounds[0].second.SetStringNoCopy(0, "b"));
ASSERT_OK(bounds[1].first.SetStringNoCopy(0, "b"));
ASSERT_OK(bounds[1].second.SetStringNoCopy(0, "c"));
ASSERT_OK(bounds[2].first.SetStringNoCopy(0, "c"));
ASSERT_OK(bounds[2].second.SetStringNoCopy(0, "d"));
ASSERT_OK(bounds[3].first.SetStringNoCopy(0, "e"));
ASSERT_OK(bounds[3].second.SetStringNoCopy(0, "f"));
ASSERT_OK(bounds[4].first.SetStringNoCopy(0, "g"));
ASSERT_OK(bounds[4].second.SetStringNoCopy(0, "h"));
const HashSchema table_wide_hash_schema{ { { "str_2" }, 2, 4 } };
const vector<RangeWithHashSchema> ranges_with_hash_schemas{
{ bounds[0].first, bounds[0].second, { { { "str_0", "int_1" }, 5, 0 } } },
{ bounds[1].first, bounds[1].second, { { { "str_0" }, 4, 1 } } },
{ bounds[2].first, bounds[2].second, { { { "int_1" }, 3, 2 } } },
{ bounds[3].first, bounds[3].second, { { { "str_2", "str_0" }, 6, 3 } } },
{ bounds[4].first, bounds[4].second, table_wide_hash_schema },
};
ASSERT_OK(CreateTable(table_name, schema, {}, {},
ranges_with_hash_schemas, table_wide_hash_schema));
NO_FATALS(CheckMasterTableCreation(table_name, 20));
// The default setting for GetTableLocationsRequestPB::max_returned_locations
// is 10 , but here it's necessary to fetch all the existing tablets.
GetTableLocationsRequestPB req;
req.mutable_table()->set_table_name(table_name);
req.clear_max_returned_locations();
GetTableLocationsResponsePB resp;
RpcController controller;
ASSERT_OK(proxy_->GetTableLocations(req, &resp, &controller));
SCOPED_TRACE(SecureDebugString(resp));
ASSERT_FALSE(resp.has_error());
ASSERT_EQ(20, resp.tablet_locations().size());
vector<PartitionKey> partition_keys_ref = {
{ string("\0\0\0\0", 4), string("a" "\0\0\0\0\0\0", 7) },
{ string("\0\0\0\1", 4), string("a" "\0\0\0\0\0\0", 7) },
{ string("\0\0\0\2", 4), string("a" "\0\0\0\0\0\0", 7) },
{ string("\0\0\0\3", 4), string("a" "\0\0\0\0\0\0", 7) },
{ string("\0\0\0\4", 4), string("a" "\0\0\0\0\0\0", 7) },
{ string("\0\0\0\0", 4), string("b" "\0\0\0\0\0\0", 7) },
{ string("\0\0\0\1", 4), string("b" "\0\0\0\0\0\0", 7) },
{ string("\0\0\0\2", 4), string("b" "\0\0\0\0\0\0", 7) },
{ string("\0\0\0\3", 4), string("b" "\0\0\0\0\0\0", 7) },
{ string("\0\0\0\0", 4), string("c" "\0\0\0\0\0\0", 7) },
{ string("\0\0\0\1", 4), string("c" "\0\0\0\0\0\0", 7) },
{ string("\0\0\0\2", 4), string("c" "\0\0\0\0\0\0", 7) },
{ string("\0\0\0\0", 4), string("e" "\0\0\0\0\0\0", 7) },
{ string("\0\0\0\1", 4), string("e" "\0\0\0\0\0\0", 7) },
{ string("\0\0\0\2", 4), string("e" "\0\0\0\0\0\0", 7) },
{ string("\0\0\0\3", 4), string("e" "\0\0\0\0\0\0", 7) },
{ string("\0\0\0\4", 4), string("e" "\0\0\0\0\0\0", 7) },
{ string("\0\0\0\5", 4), string("e" "\0\0\0\0\0\0", 7) },
{ string("\0\0\0\0", 4), string("g" "\0\0\0\0\0\0", 7) },
{ string("\0\0\0\1", 4), string("g" "\0\0\0\0\0\0", 7) },
};
// Sorting partition keys to match the tablets that are returned in sorted order.
sort(partition_keys_ref.begin(), partition_keys_ref.end());
ASSERT_EQ(partition_keys_ref.size(), resp.tablet_locations_size());
for (auto i = 0; i < resp.tablet_locations_size(); ++i) {
EXPECT_EQ(partition_keys_ref[i].ToString(),
resp.tablet_locations(i).partition().partition_key_start());
}
}
// TODO(aserbin): update this scenario once varying hash dimensions per range
// are supported
TEST_F(TableLocationsTest, RangeSpecificHashingVaryingDimensions) {
const string table_name = "test";
Schema schema({ ColumnSchema("key", STRING), ColumnSchema("val", STRING) }, 2);
KuduPartialRow row(&schema);
vector<pair<KuduPartialRow, KuduPartialRow>> bounds(2, { row, row });
ASSERT_OK(bounds[0].first.SetStringNoCopy(0, "a"));
ASSERT_OK(bounds[0].second.SetStringNoCopy(0, "b"));
ASSERT_OK(bounds[1].first.SetStringNoCopy(0, "c"));
ASSERT_OK(bounds[1].second.SetStringNoCopy(0, "d"));
const HashSchema table_wide_hash_schema{ { { "val" }, 4, 2 } };
const vector<RangeWithHashSchema> ranges_with_hash_schemas{
{ bounds[0].first, bounds[0].second,
{ { { { "key" }, 3, 0 }, { { "val" }, 2, 1 } } } },
{ bounds[1].first, bounds[1].second, table_wide_hash_schema },
};
const auto s = CreateTable(table_name, schema, {}, {},
ranges_with_hash_schemas, table_wide_hash_schema);
ASSERT_TRUE(s.IsNotSupported()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(),
"varying number of hash dimensions per range is not yet supported");
}
TEST_F(TableLocationsWithTSLocationTest, TestGetTSLocation) {
const string table_name = "test";
Schema schema({ ColumnSchema("key", STRING) }, 1);
ASSERT_OK(CreateTable(table_name, schema));
NO_FATALS(CheckMasterTableCreation(table_name, 1));
{
GetTableLocationsRequestPB req;
GetTableLocationsResponsePB resp;
RpcController controller;
req.mutable_table()->set_table_name(table_name);
req.set_intern_ts_infos_in_response(true);
ASSERT_OK(proxy_->GetTableLocations(req, &resp, &controller));
SCOPED_TRACE(SecureDebugString(resp));
ASSERT_TRUE(!resp.has_error());
ASSERT_EQ(1, resp.tablet_locations().size());
ASSERT_EQ(3, resp.tablet_locations(0).interned_replicas_size());
const auto& loc = resp.tablet_locations(0);
for (const auto& replica : loc.interned_replicas()) {
ASSERT_EQ("/foo", resp.ts_infos(replica.ts_info_idx()).location());
}
}
}
TEST_F(TableLocationsTest, GetTableLocationsBenchmark) {
SKIP_IF_SLOW_NOT_ALLOWED();
const int kNumSplits = FLAGS_benchmark_num_tablets - 1;
const int kNumThreads = FLAGS_benchmark_num_threads;
const auto kRuntime = MonoDelta::FromSeconds(FLAGS_benchmark_runtime_secs);
const string table_name = "test";
NO_FATALS(CreateTable(table_name, kNumSplits));
// Make one proxy per thread, so each thread gets its own messenger and
// reactor. If there were only one messenger, then only one reactor thread
// would be used for the connection to the master, so this benchmark would
// probably be testing the serialization and network code rather than the
// master GTL code.
vector<unique_ptr<MasterServiceProxy>> proxies;
proxies.reserve(kNumThreads);
for (int i = 0; i < kNumThreads; i++) {
MessengerBuilder bld("Client");
bld.set_num_reactors(1);
shared_ptr<Messenger> msg;
ASSERT_OK(bld.Build(&msg));
const auto& addr = cluster_->mini_master()->bound_rpc_addr();
proxies.emplace_back(new MasterServiceProxy(std::move(msg), addr, addr.host()));
}
std::atomic<bool> stop { false };
vector<thread> threads;
threads.reserve(kNumThreads);
// If a thread encounters an error, the test is failed.
vector<uint64_t> err_counters(kNumThreads, 0);
for (auto i = 0; i < kNumThreads; ++i) {
threads.emplace_back([&, i]() {
GetTableLocationsRequestPB req;
GetTableLocationsResponsePB resp;
while (!stop) {
RpcController controller;
req.Clear();
resp.Clear();
req.mutable_table()->set_table_name(table_name);
req.set_max_returned_locations(1000);
req.set_intern_ts_infos_in_response(true);
const auto s = proxies[i]->GetTableLocations(req, &resp, &controller);
if (!s.ok()) {
LOG(WARNING) << "GetTableLocations() failed: " << s.ToString();
++err_counters[i];
break;
}
CHECK_EQ(resp.tablet_locations_size(), kNumSplits + 1);
}
});
}
SleepFor(kRuntime);
stop = true;
for (auto& t : threads) {
t.join();
}
const auto& ent = cluster_->mini_master()->master()->metric_entity();
auto counter = METRIC_rpcs_queue_overflow.Instantiate(ent);
auto hist = METRIC_handler_latency_kudu_master_MasterService_GetTableLocations
.Instantiate(ent);
cluster_->Shutdown();
LOG(INFO) << "RPC queue overflows: " << counter->value();
const auto errors = accumulate(err_counters.begin(), err_counters.end(), 0UL);
if (errors != 0) {
FAIL() << Substitute("detected $0 errors", errors);
}
LOG(INFO) << Substitute(
"GetTableLocations RPC: $0 req/sec",
hist->histogram()->TotalCount() / kRuntime.ToSeconds());
ostringstream ostr;
ostr << "Stats on GetTableLocations RPC (times in microseconds): ";
hist->histogram()->DumpHumanReadable(&ostr);
LOG(INFO) << ostr.str();
}
// Similar to GetTableLocationsBenchmark, but calls the function directly within
// the mini-cluster process instead of issuing RPC.
TEST_F(TableLocationsTest, GetTableLocationsBenchmarkFunctionCall) {
SKIP_IF_SLOW_NOT_ALLOWED();
const string kUserName = "testuser";
const size_t kNumSplits = FLAGS_benchmark_num_tablets - 1;
const size_t kNumThreads = FLAGS_benchmark_num_threads;
const auto kRuntime = MonoDelta::FromSeconds(FLAGS_benchmark_runtime_secs);
const string table_name = "test";
NO_FATALS(CreateTable(table_name, kNumSplits));
CatalogManager* cm = cluster_->mini_master()->master()->catalog_manager();
const std::optional<string> username = kUserName;
std::atomic<bool> stop(false);
vector<thread> threads;
threads.reserve(kNumThreads);
vector<uint64_t> req_counters(kNumThreads, 0);
vector<uint64_t> err_counters(kNumThreads, 0);
for (size_t idx = 0; idx < kNumThreads; ++idx) {
threads.emplace_back([&, idx]() {
GetTableLocationsRequestPB req;
google::protobuf::Arena arena;
while (!stop) {
req.Clear();
arena.Reset();
auto* resp = google::protobuf::Arena::CreateMessage<GetTableLocationsResponsePB>(&arena);
req.mutable_table()->set_table_name(table_name);
req.set_max_returned_locations(1000);
req.set_intern_ts_infos_in_response(true);
++req_counters[idx];
{
CatalogManager::ScopedLeaderSharedLock l(cm);
auto s = cm->GetTableLocations(&req, resp, username);
if (!s.ok()) {
++err_counters[idx];
}
}
}
});
}
SleepFor(kRuntime);
stop = true;
for (auto& t : threads) {
t.join();
}
cluster_->Shutdown();
const auto errors = accumulate(err_counters.begin(), err_counters.end(), 0UL);
if (errors != 0) {
FAIL() << Substitute("detected $0 errors", errors);
}
const double total = accumulate(req_counters.begin(), req_counters.end(), 0UL);
LOG(INFO) << Substitute(
"GetTableLocations function call: $0 req/sec",
total / kRuntime.ToSeconds());
}
// A small benchmark to see whether multiple clients refreshing table location
// might create the thundering herd pattern with their GetTableLocations()
// requests, overflowing the RPC service queue.
class RefreshTableLocationsTest : public KuduTest {
public:
void SetUp() override {
KuduTest::SetUp();
// Set the RPC queue size limit and the number of RPC service threads small
// to register RPC queue overlows, if any. The idea is to make this test
// use smaller numbers, while in real life there would be more clients
// and regular length of the RPC service queue.
FLAGS_rpc_num_service_threads = 2;
FLAGS_rpc_service_queue_length = std::max(1, FLAGS_benchmark_num_threads / 3);
// Make table location information expire faster so the clients would try
// to refresh the information during the scenario, while not spending too
// much time to run the scenario.
FLAGS_table_locations_ttl_ms = 1500;
{
InternalMiniClusterOptions opts;
opts.num_masters = 1;
opts.num_tablet_servers = 3;
cluster_.reset(new InternalMiniCluster(env_, std::move(opts)));
}
ASSERT_OK(cluster_->Start());
}
void TearDown() override {
if (cluster_) {
cluster_->Shutdown();
cluster_.reset();
}
KuduTest::TearDown();
}
protected:
unique_ptr<InternalMiniCluster> cluster_;
};
TEST_F(RefreshTableLocationsTest, ThunderingHerd) {
SKIP_IF_SLOW_NOT_ALLOWED();
const auto kNumThreads = FLAGS_benchmark_num_threads;
const auto kRuntime =
MonoDelta::FromMilliseconds(5 * FLAGS_table_locations_ttl_ms);
// Create test tables and populate them with data. These tables are used by
// the threads below, each running its own read test workload.
for (auto idx = 0; idx < kNumThreads; ++idx) {
TestWorkload w(cluster_.get());
w.set_num_replicas(3);
w.set_num_tablets(10);
w.set_table_name(Substitute("test_table_$0", idx));
w.set_num_write_threads(1);
w.set_num_read_threads(0);
w.Setup();
w.Start();
while (w.rows_inserted() < 100) {
SleepFor(MonoDelta::FromMilliseconds(10));
}
w.StopAndJoin();
}
std::atomic<bool> stop(false);
vector<thread> threads;
threads.reserve(kNumThreads);
for (auto idx = 0; idx < kNumThreads; ++idx) {
// It's important to have multiple workloads to simulate multiple clients,
// each with its own table locations cache.
threads.emplace_back([&, idx]() {
TestWorkload w(cluster_.get());
w.set_table_name(Substitute("test_table_$0", idx));
w.set_num_read_threads(1);
w.set_num_write_threads(0);
w.Setup();
w.Start();
while (!stop) {
SleepFor(MonoDelta::FromMilliseconds(10));
}
w.StopAndJoin();
});
}
// Make sure the reader threads refresh the expired information on table
// locations few times during the scenario.
SleepFor(kRuntime);
stop = true;
for (auto& t : threads) {
t.join();
}
const auto& ent = cluster_->mini_master()->master()->metric_entity();
auto counter = METRIC_rpcs_queue_overflow.Instantiate(ent);
auto hist = METRIC_handler_latency_kudu_master_MasterService_GetTableLocations.
Instantiate(ent);
cluster_->Shutdown();
LOG(INFO) << "RPC queue overflows: " << counter->value();
LOG(INFO) << Substitute(
"GetTableLocations RPC: $0 req/sec",
hist->histogram()->TotalCount() / kRuntime.ToSeconds());
ostringstream ostr;
ostr << "Stats on GetTableLocations RPC (times in microseconds): ";
hist->histogram()->DumpHumanReadable(&ostr);
LOG(INFO) << ostr.str();
}
class TableLocationsCacheBaseTest : public KuduTest {
public:
TableLocationsCacheBaseTest(int num_tablet_servers,
int cache_capacity_mb)
: num_tablet_servers_(num_tablet_servers),
cache_capacity_mb_(cache_capacity_mb),
schema_(itest::SimpleIntKeyKuduSchema()) {
}
void SetUp() override {
KuduTest::SetUp();
InternalMiniClusterOptions opts;
opts.num_tablet_servers = num_tablet_servers_;
// Setting the cache's capacity to anything > 0 enables caching of table
// locations.
FLAGS_table_locations_cache_capacity_mb = cache_capacity_mb_;
cluster_.reset(new InternalMiniCluster(env_, opts));
ASSERT_OK(cluster_->Start());
ASSERT_OK(cluster_->CreateClient(nullptr, &client_));
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
ASSERT_OK(table_creator->table_name(kTableName)
.schema(&schema_)
.set_range_partition_columns({ "key" })
.num_replicas(std::min<int>(3, num_tablet_servers_))
.add_hash_partitions({ "key" }, 5)
.Create());
metric_entity_ = cluster_->mini_master()->master()->metric_entity();
ASSERT_NE(nullptr, metric_entity_.get());
}
void TearDown() override {
if (cluster_) {
cluster_->Shutdown();
cluster_.reset();
}
KuduTest::TearDown();
}
int64_t GetCacheInserts() const {
return METRIC_table_locations_cache_inserts.Instantiate(metric_entity_)->value();
}
int64_t GetCacheLookups() const {
return METRIC_table_locations_cache_lookups.Instantiate(metric_entity_)->value();
}
int64_t GetCacheHits() const {
return METRIC_table_locations_cache_hits.Instantiate(metric_entity_)->value();
}
int64_t GetCacheMisses() const {
return METRIC_table_locations_cache_misses.Instantiate(metric_entity_)->value();
}
int64_t GetCacheEvictions() const {
return METRIC_table_locations_cache_evictions.Instantiate(metric_entity_)->value();
}
uint64_t GetCacheMemoryUsage() const {
return METRIC_table_locations_cache_memory_usage.Instantiate(metric_entity_, 0)->value();
}
// Insert about the 'rows_num' into the table named 'table_name', returning
// the actual number of rows written.
int64_t InsertRows(const string& table_name, int64_t rows_num = 10) {
TestWorkload w(cluster_.get());
w.set_table_name(table_name);
w.set_schema(schema_);
w.set_num_read_threads(0);
w.set_num_write_threads(1);
w.set_write_batch_size(1);
w.set_write_pattern(TestWorkload::INSERT_SEQUENTIAL_ROWS);
w.set_already_present_allowed(true);
w.Setup();
w.Start();
while (w.rows_inserted() < rows_num) {
SleepFor(MonoDelta::FromMilliseconds(10));
}
w.StopAndJoin();
// Return the actual number of rows inserted.
return w.rows_inserted();
}
protected:
static constexpr const char* const kTableName = "table_locations_cache_test";
const int num_tablet_servers_;
const int cache_capacity_mb_;
const KuduSchema schema_;
unique_ptr<InternalMiniCluster> cluster_;
client::sp::shared_ptr<KuduClient> client_;
scoped_refptr<MetricEntity> metric_entity_;
};
class TableLocationsCacheDisabledTest : public TableLocationsCacheBaseTest {
public:
TableLocationsCacheDisabledTest()
: TableLocationsCacheBaseTest(1 /*num_tablet_servers*/,
0 /*cache_capacity_mb*/) {}
};
TEST_F(TableLocationsCacheDisabledTest, Basic) {
// Verify that by default tablet location cache is disabled.
google::CommandLineFlagInfo flag_info;
ASSERT_TRUE(google::GetCommandLineFlagInfo(
"table_locations_cache_capacity_mb", &flag_info));
ASSERT_TRUE(flag_info.is_default);
ASSERT_EQ("0", flag_info.default_value);
InsertRows(kTableName);
// When disabled, cache should not be used.
EXPECT_EQ(0, GetCacheEvictions());
EXPECT_EQ(0, GetCacheHits());
EXPECT_EQ(0, GetCacheInserts());
EXPECT_EQ(0, GetCacheLookups());
EXPECT_EQ(0, GetCacheMisses());
EXPECT_EQ(0, GetCacheMemoryUsage());
}
class TableLocationsCacheTest : public TableLocationsCacheBaseTest {
public:
TableLocationsCacheTest()
: TableLocationsCacheBaseTest(1 /*num_tablet_servers*/,
1 /*cache_capacity_mb*/) {}
};
// Verify that requests with different essential parameters produce
// different entries in the table locations cache.
TEST_F(TableLocationsCacheTest, DifferentRequestParameters) {
MessengerBuilder bld("test_builder");
shared_ptr<Messenger> messenger;
ASSERT_OK(bld.Build(&messenger));
const auto& addr = cluster_->mini_master()->bound_rpc_addr();
MasterServiceProxy proxy(messenger, addr, addr.host());
{
// Issue one query many times -- there should be just one record inserted.
const auto prev_cache_inserts = GetCacheInserts();
for (auto i = 0; i < 5; ++i) {
GetTableLocationsRequestPB req;
GetTableLocationsResponsePB resp;
req.mutable_table()->set_table_name(kTableName);
req.set_max_returned_locations(1);
RpcController ctl;
ASSERT_OK(proxy.GetTableLocations(req, &resp, &ctl));
ASSERT_TRUE(!resp.has_error());
}
ASSERT_EQ(prev_cache_inserts + 1, GetCacheInserts());
}
{
// Issue a query with different value of 'max_returned_locations'. A new
// entry should be added into the cache even if other parameters are the
// same as in requests sent prior.
const auto prev_cache_inserts = GetCacheInserts();
GetTableLocationsRequestPB req;
GetTableLocationsResponsePB resp;
req.mutable_table()->set_table_name(kTableName);
req.set_max_returned_locations(10);
RpcController ctl;
ASSERT_OK(proxy.GetTableLocations(req, &resp, &ctl));
ASSERT_TRUE(!resp.has_error());
ASSERT_EQ(prev_cache_inserts + 1, GetCacheInserts());
}
{
// Set 'replica_type_filter' parameter.
const auto prev_cache_inserts = GetCacheInserts();
GetTableLocationsRequestPB req;
GetTableLocationsResponsePB resp;
req.mutable_table()->set_table_name(kTableName);
req.set_max_returned_locations(10);
req.set_replica_type_filter(ReplicaTypeFilter::ANY_REPLICA);
RpcController ctl;
ASSERT_OK(proxy.GetTableLocations(req, &resp, &ctl));
ASSERT_TRUE(!resp.has_error());
ASSERT_EQ(prev_cache_inserts + 1, GetCacheInserts());
}
{
// Switch 'replica_type_filter' parameter to a different value.
const auto prev_cache_inserts = GetCacheInserts();
GetTableLocationsRequestPB req;
GetTableLocationsResponsePB resp;
req.mutable_table()->set_table_name(kTableName);
req.set_max_returned_locations(10);
req.set_replica_type_filter(ReplicaTypeFilter::VOTER_REPLICA);
RpcController ctl;
ASSERT_OK(proxy.GetTableLocations(req, &resp, &ctl));
ASSERT_TRUE(!resp.has_error());
ASSERT_EQ(prev_cache_inserts + 1, GetCacheInserts());
}
}
// This scenario verifies basic functionality of the table locations cache.
TEST_F(TableLocationsCacheTest, Basic) {
SKIP_IF_SLOW_NOT_ALLOWED();
int64_t prev_cache_hits = 0;
int64_t prev_cache_inserts = 0;
int64_t prev_cache_lookups = 0;
int64_t prev_cache_misses = 0;
int64_t prev_cache_evictions = 0;
{
// Run a workload to prime the table locations cache.
InsertRows(kTableName);
const auto cache_lookups = GetCacheLookups();
EXPECT_GT(cache_lookups, 0);
const auto cache_misses = GetCacheMisses();
const auto cache_inserts = GetCacheInserts();
EXPECT_EQ(cache_misses, cache_inserts);
const auto cache_hits = GetCacheHits();
EXPECT_EQ(cache_lookups - cache_misses, cache_hits);
// The cache usage should be non-zero since some entries are present.
EXPECT_GT(GetCacheMemoryUsage(), 0);
// Store the counters for the next round.
prev_cache_hits = cache_hits;
prev_cache_inserts = cache_inserts;
prev_cache_lookups = cache_lookups;
prev_cache_misses = cache_misses;
prev_cache_evictions = GetCacheEvictions();
}
{
// The cache has been primed -- now run another workload.
InsertRows(kTableName);
// No new cache inserts expected.
EXPECT_EQ(prev_cache_inserts, GetCacheInserts());
// No new cache misses expected.
EXPECT_EQ(prev_cache_misses, GetCacheMisses());
// There should be new cache lookups.
const auto cache_lookups = GetCacheLookups();
EXPECT_LT(prev_cache_lookups, cache_lookups);
// There should be new cache hits.
const auto cache_hits = GetCacheHits();
EXPECT_LT(prev_cache_hits, cache_hits);
// All new lookups should hit the cache.
EXPECT_EQ(cache_lookups - prev_cache_lookups, cache_hits - prev_cache_hits);
}
{
// Alter the test table and make sure the information isn't purged from the
// table locations cache since the table locations haven't changed.
const string new_table_name = string(kTableName) + "_renamed";
unique_ptr<KuduTableAlterer> a0(client_->NewTableAlterer(kTableName));
auto s = a0->RenameTo(new_table_name)->Alter();
ASSERT_OK(s);
// Wait for the master to get notified on the change.
SleepFor(MonoDelta::FromMilliseconds(3 * FLAGS_heartbeat_interval_ms));
// No new cache evictions are expected.
EXPECT_EQ(prev_cache_evictions, GetCacheEvictions());
InsertRows(new_table_name);
EXPECT_EQ(prev_cache_evictions, GetCacheEvictions());
// No new cache inserts nor misses are expected.
EXPECT_EQ(prev_cache_inserts, GetCacheInserts());
EXPECT_EQ(prev_cache_misses, GetCacheMisses());
// Rename the table back.
unique_ptr<KuduTableAlterer> a1(client_->NewTableAlterer(new_table_name));
s = a1->RenameTo(kTableName)->Alter();
ASSERT_OK(s);
}
{
// Drop the test table and make sure the cached location information is
// purged from the table locations cache.
ASSERT_OK(client_->DeleteTable(kTableName));
// Wait for the master to get notified on the change.
SleepFor(MonoDelta::FromMilliseconds(3 * FLAGS_heartbeat_interval_ms));
// All the previously inserted records should be purged.
EXPECT_EQ(GetCacheEvictions(), GetCacheInserts());
EXPECT_EQ(0, GetCacheMemoryUsage());
}
}
class TableLocationsCacheTabletChangeTest :
public TableLocationsCacheBaseTest {
public:
TableLocationsCacheTabletChangeTest()
: TableLocationsCacheBaseTest(5 /*num_tablet_servers*/,
1 /*cache_capacity_mb*/) {}
};
// Verify the behavior of the table locations cache when table's tablets change
// leadership roles or move around.
TEST_F(TableLocationsCacheTabletChangeTest, Basic) {
SKIP_IF_SLOW_NOT_ALLOWED();
{
// Run a workload to prime the table locations cache.
InsertRows(kTableName);
const auto cache_lookups = GetCacheLookups();
EXPECT_GT(cache_lookups, 0);
const auto cache_misses = GetCacheMisses();
EXPECT_GT(cache_misses, 0);
EXPECT_EQ(cache_misses, GetCacheInserts());
EXPECT_EQ(cache_lookups - cache_misses, GetCacheHits());
}
// Restart tablet servers to reshuffle leadership role of tablet replicas.
for (auto idx = 0; idx < cluster_->num_tablet_servers(); ++idx) {
cluster_->mini_tablet_server(idx)->Shutdown();
SleepFor(MonoDelta::FromMilliseconds(static_cast<int64_t>(
2 * FLAGS_leader_failure_max_missed_heartbeat_periods *
FLAGS_raft_heartbeat_interval_ms) + 3 * FLAGS_heartbeat_interval_ms));
ASSERT_OK(cluster_->mini_tablet_server(idx)->Restart());
}
{
// All the previously inserted records should be purged.
EXPECT_EQ(GetCacheEvictions(), GetCacheInserts());
const auto prev_cache_inserts = GetCacheInserts();
const auto prev_cache_misses = GetCacheMisses();
InsertRows(kTableName);
// New cache inserts and misses are expected since the previously inserted
// records have been just purged.
EXPECT_LT(prev_cache_inserts, GetCacheInserts());
EXPECT_LT(prev_cache_misses, GetCacheMisses());
}
// Shutdown one tablet server and get the system some time to re-replicate
// affected tablet replicas elsewhere.
FLAGS_follower_unavailable_considered_failed_sec = 1;
cluster_->mini_tablet_server(0)->Shutdown();
SleepFor(MonoDelta::FromMilliseconds(static_cast<int64_t>(
2 * FLAGS_leader_failure_max_missed_heartbeat_periods *
FLAGS_raft_heartbeat_interval_ms +
3 * 1000 * FLAGS_follower_unavailable_considered_failed_sec +
3 * FLAGS_heartbeat_interval_ms)));
// All the previously inserted records should be purged.
EXPECT_EQ(GetCacheEvictions(), GetCacheInserts());
EXPECT_EQ(0, GetCacheMemoryUsage());
}
class TableLocationsCacheMultiMasterTest : public KuduTest {
public:
TableLocationsCacheMultiMasterTest()
: schema_(itest::SimpleIntKeyKuduSchema()) {
}
void SetUp() override {
KuduTest::SetUp();
ExternalMiniClusterOptions opts;
opts.num_masters = 3;
opts.num_tablet_servers = 3;
opts.extra_master_flags = {
"--table_locations_cache_capacity_mb=8",
Substitute("--raft_heartbeat_interval_ms=$0",
kRaftHeartbeatIntervalMs),
Substitute("--leader_failure_max_missed_heartbeat_periods=$0",
kMaxMissedHeartbeatPeriods),
};
cluster_.reset(new cluster::ExternalMiniCluster(std::move(opts)));
ASSERT_OK(cluster_->Start());
client::sp::shared_ptr<client::KuduClient> client;
ASSERT_OK(cluster_->CreateClient(nullptr, &client));
unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator());
ASSERT_OK(table_creator->table_name(kTableName)
.schema(&schema_)
.set_range_partition_columns({ "key" })
.num_replicas(3)
.add_hash_partitions({ "key" }, 5)
.Create());
}
int64_t GetTableLocationCacheMetric(
int master_idx,
const MetricPrototype* metric_proto) const {
CHECK_LT(master_idx, cluster_->num_masters());
int64_t value;
CHECK_OK(itest::GetInt64Metric(
cluster_->master(master_idx)->bound_http_hostport(),
&METRIC_ENTITY_server, "kudu.master", metric_proto, "value", &value));
return value;
}
int64_t GetTableLocationCacheMetricAllMasters(
const MetricPrototype* metric_proto) const {
int64_t total = 0;
for (auto idx = 0; idx < cluster_->num_masters(); ++idx) {
const auto val = GetTableLocationCacheMetric(idx, metric_proto);
total += val;
}
return total;
}
void CheckCacheMetricsReset(int master_idx) const {
for (const auto* metric : {
&METRIC_table_locations_cache_evictions,
&METRIC_table_locations_cache_hits,
&METRIC_table_locations_cache_inserts,
&METRIC_table_locations_cache_lookups,
&METRIC_table_locations_cache_misses, }) {
SCOPED_TRACE(Substitute(
"verifying value of '$0' metric ($1) for master at index $2",
metric->name(), metric->label(), master_idx));
const auto val = GetTableLocationCacheMetric(master_idx, metric);
EXPECT_EQ(0, val);
}
const auto cache_memory_usage = GetTableLocationCacheMetric(
master_idx, &METRIC_table_locations_cache_memory_usage);
EXPECT_EQ(0, cache_memory_usage);
}
protected:
static constexpr const char* const kTableName = "test_locations_cache_multi_master";
static constexpr int32_t kRaftHeartbeatIntervalMs = 300;
static constexpr int32_t kMaxMissedHeartbeatPeriods = 2;
const KuduSchema schema_;
std::unique_ptr<cluster::ExternalMiniCluster> cluster_;
};
// Verify that the table location cache is reset upon change once master
// starts its leadership role.
TEST_F(TableLocationsCacheMultiMasterTest, ResetCache) {
SKIP_IF_SLOW_NOT_ALLOWED();
// Make sure the cache's metrics are zeroed if no client activity has been
// there yet.
for (auto idx = 0; idx < cluster_->num_masters(); ++idx) {
NO_FATALS(CheckCacheMetricsReset(idx));
}
TestWorkload w(cluster_.get());
w.set_table_name(kTableName);
w.set_schema(schema_);
w.Setup();
w.Start();
while (w.rows_inserted() < 10) {
SleepFor(MonoDelta::FromMilliseconds(10));
}
w.StopAndJoin();
// Make sure some items are added into the cache.
EXPECT_GT(GetTableLocationCacheMetricAllMasters(
&METRIC_table_locations_cache_inserts), 0);
int former_leader_master_idx;
ASSERT_OK(cluster_->GetLeaderMasterIndex(&former_leader_master_idx));
int leader_master_idx = -1;
ASSERT_EVENTUALLY([&] {
// Induce a change of the masters' leadership.
ASSERT_OK(cluster_->master(former_leader_master_idx)->Pause());
// Make one master stop sending heartbeats, and give the rest about three
// heartbeat periods to elect a new leader (include an extra margin to keep
// the scenario stable).
SleepFor(MonoDelta::FromMilliseconds(
2 * (kRaftHeartbeatIntervalMs * kMaxMissedHeartbeatPeriods +
kRaftHeartbeatIntervalMs * 3)));
ASSERT_OK(cluster_->master(former_leader_master_idx)->Resume());
ASSERT_OK(cluster_->GetLeaderMasterIndex(&leader_master_idx));
ASSERT_NE(former_leader_master_idx, leader_master_idx);
});
// An extra sanity check.
ASSERT_NE(-1, leader_master_idx);
// Make sure all the cache's metrics are reset once master just has become
// a leader.
NO_FATALS(CheckCacheMetricsReset(leader_master_idx));
}
} // namespace master
} // namespace kudu