| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <algorithm> |
| #include <cstdint> |
| #include <functional> |
| #include <memory> |
| #include <ostream> |
| #include <string> |
| #include <vector> |
| |
| #include <gflags/gflags_declare.h> |
| #include <glog/logging.h> |
| #include <gtest/gtest.h> |
| |
| #include "kudu/client/client.h" |
| #include "kudu/client/schema.h" |
| #include "kudu/client/shared_ptr.h" // IWYU pragma: keep |
| #include "kudu/client/write_op.h" |
| #include "kudu/common/partial_row.h" |
| #include "kudu/consensus/consensus.pb.h" |
| #include "kudu/consensus/consensus.proxy.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/integration-tests/cluster_verifier.h" |
| #include "kudu/integration-tests/test_workload.h" |
| #include "kudu/master/sys_catalog.h" |
| #include "kudu/mini-cluster/external_mini_cluster.h" |
| #include "kudu/rpc/rpc_controller.h" |
| #include "kudu/tablet/key_value_test_schema.h" |
| #include "kudu/transactions/txn_system_client.h" |
| #include "kudu/util/monotime.h" |
| #include "kudu/util/net/net_util.h" |
| #include "kudu/util/net/sockaddr.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/test_macros.h" |
| #include "kudu/util/test_util.h" |
| |
| DECLARE_bool(rpc_reopen_outbound_connections); |
| |
| using kudu::client::sp::shared_ptr; |
| using kudu::cluster::ExternalMiniCluster; |
| using kudu::cluster::ExternalMiniClusterOptions; |
| using kudu::transactions::TxnSystemClient; |
| using std::string; |
| using std::unique_ptr; |
| using std::vector; |
| using strings::Substitute; |
| |
| namespace kudu { |
| namespace client { |
| |
| namespace { |
| |
| // Create a table with the specified name (no replication) and specified |
| // number of hash partitions. |
| Status CreateTable(KuduClient* client, |
| const string& table_name, |
| const KuduSchema& schema, |
| int num_hash_partitions) { |
| unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator()); |
| return table_creator->table_name(table_name) |
| .schema(&schema) |
| .add_hash_partitions({ "key" }, num_hash_partitions) |
| .num_replicas(1) |
| .Create(); |
| } |
| |
| unique_ptr<KuduInsert> BuildTestRow(KuduTable* table, int index) { |
| unique_ptr<KuduInsert> insert(table->NewInsert()); |
| KuduPartialRow* row = insert->mutable_row(); |
| CHECK_OK(row->SetInt32(0, index)); |
| CHECK_OK(row->SetInt32(1, index * 2)); |
| return insert; |
| } |
| |
| // Insert given number of tests rows into the default test table in the context |
| // of a new session. |
| Status InsertTestRows(KuduClient* client, KuduTable* table, |
| int num_rows, int first_row = 0) { |
| shared_ptr<KuduSession> session = client->NewSession(); |
| RETURN_NOT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC)); |
| session->SetTimeoutMillis(60000); |
| for (int i = first_row; i < num_rows + first_row; ++i) { |
| unique_ptr<KuduInsert> insert(BuildTestRow(table, i)); |
| RETURN_NOT_OK(session->Apply(insert.release())); |
| } |
| return session->Flush(); |
| } |
| |
| } // anonymous namespace |
| |
| class AuthTokenExpireITestBase : public KuduTest { |
| public: |
| AuthTokenExpireITestBase(int64_t authn_token_validity_seconds, |
| int64_t authz_token_validity_seconds, |
| int num_masters, |
| int num_tablet_servers) |
| : authn_token_validity_seconds_(authn_token_validity_seconds), |
| authz_token_validity_seconds_(authz_token_validity_seconds), |
| num_masters_(num_masters), |
| num_tablet_servers_(num_tablet_servers), |
| schema_(KuduSchema::FromSchema(CreateKeyValueTestSchema())) { |
| cluster_opts_.num_tablet_servers = num_tablet_servers_; |
| cluster_opts_.num_masters = num_masters_; |
| cluster_opts_.enable_kerberos = true; |
| } |
| |
| void SetUp() override { |
| KuduTest::SetUp(); |
| cluster_.reset(new ExternalMiniCluster(cluster_opts_)); |
| } |
| |
| void TearDown() override { |
| if (cluster_) { |
| cluster_->Shutdown(); |
| } |
| KuduTest::TearDown(); |
| } |
| |
| protected: |
| const int64_t authn_token_validity_seconds_; |
| const int64_t authz_token_validity_seconds_; |
| const int num_masters_; |
| const int num_tablet_servers_; |
| KuduSchema schema_; |
| ExternalMiniClusterOptions cluster_opts_; |
| shared_ptr<ExternalMiniCluster> cluster_; |
| }; |
| |
| |
| class AuthTokenExpireITest : public AuthTokenExpireITestBase { |
| public: |
| explicit AuthTokenExpireITest(int64_t authn_token_validity_seconds = 2, |
| int64_t authz_token_validity_seconds = 2) |
| : AuthTokenExpireITestBase(authn_token_validity_seconds, |
| authz_token_validity_seconds, |
| /*num_masters=*/ 1, |
| /*num_tablet_servers=*/ 3) { |
| // Masters and tservers inject FATAL_INVALID_AUTHENTICATION_TOKEN errors. |
| // The client should retry the operation again and eventually it should |
| // succeed even with the high ratio of injected errors. |
| cluster_opts_.extra_master_flags = { |
| "--rpc_inject_invalid_authn_token_ratio=0.5", |
| |
| // In addition to very short authn token TTL, rotate token signing keys as |
| // often as possible: we want to cover TSK propagation-related scenarios |
| // as well (i.e. possible ERROR_UNAVAILABLE errors from tservers) upon |
| // a new authn token re-acquisitions and retried RPCs. |
| "--tsk_rotation_seconds=1", |
| Substitute("--authn_token_validity_seconds=$0", authn_token_validity_seconds_), |
| Substitute("--authz_token_validity_seconds=$0", authz_token_validity_seconds_), |
| }; |
| |
| cluster_opts_.extra_tserver_flags = { |
| "--rpc_inject_invalid_authn_token_ratio=0.5", |
| |
| // Tservers inject ERROR_INVALID_AUTHORIZATION_TOKEN errors, which will |
| // lead the client to retry the operation with after fetching a new authz |
| // token from the master. |
| "--tserver_inject_invalid_authz_token_ratio=0.5", |
| |
| "--tserver_enforce_access_control=true", |
| |
| // Decreasing TS->master heartbeat interval speeds up the test. |
| "--heartbeat_interval_ms=10", |
| }; |
| } |
| |
| void SetUp() override { |
| AuthTokenExpireITestBase::SetUp(); |
| ASSERT_OK(cluster_->Start()); |
| } |
| }; |
| |
| |
| // Make sure authn token is re-acquired on certain scenarios upon restarting |
| // tablet servers. |
| TEST_F(AuthTokenExpireITest, RestartTabletServers) { |
| const string table_name = "authn-token-expire-restart-tablet-servers"; |
| |
| // Create and open one table, keeping it open over the component restarts. |
| shared_ptr<KuduClient> client; |
| ASSERT_OK(cluster_->CreateClient(nullptr, &client)); |
| ASSERT_OK(CreateTable(client.get(), table_name, schema_, num_tablet_servers_)); |
| shared_ptr<KuduTable> table; |
| ASSERT_OK(client->OpenTable(table_name, &table)); |
| ASSERT_OK(InsertTestRows(client.get(), table.get(), |
| num_tablet_servers_, num_tablet_servers_ * 0)); |
| |
| // Restart all tablet servers. |
| for (int i = 0; i < cluster_->num_tablet_servers(); ++i) { |
| auto server = cluster_->tablet_server(i); |
| ASSERT_NE(nullptr, server); |
| server->Shutdown(); |
| ASSERT_OK(server->Restart()); |
| } |
| SleepFor(MonoDelta::FromSeconds(authn_token_validity_seconds_ + 1)); |
| |
| ASSERT_OK(InsertTestRows(client.get(), table.get(), |
| num_tablet_servers_, num_tablet_servers_ * 1)); |
| SleepFor(MonoDelta::FromSeconds(authn_token_validity_seconds_ + 1)); |
| // Make sure to insert a row into all tablets to make an RPC call to every |
| // tablet server hosting the table. |
| ASSERT_OK(InsertTestRows(client.get(), table.get(), |
| num_tablet_servers_, num_tablet_servers_ * 2)); |
| } |
| |
| // Make sure authn token is re-acquired on certain scenarios upon restarting |
| // both masters and tablet servers. |
| TEST_F(AuthTokenExpireITest, RestartCluster) { |
| const string table_name = "authn-token-expire-restart-cluster"; |
| |
| shared_ptr<KuduClient> client; |
| ASSERT_OK(cluster_->CreateClient(nullptr, &client)); |
| ASSERT_OK(CreateTable(client.get(), table_name, schema_, num_tablet_servers_)); |
| shared_ptr<KuduTable> table; |
| ASSERT_OK(client->OpenTable(table_name, &table)); |
| ASSERT_OK(InsertTestRows(client.get(), table.get(), |
| num_tablet_servers_, num_tablet_servers_ * 0)); |
| |
| // Restart all Kudu server-side components: masters and tablet servers. |
| cluster_->Shutdown(); |
| ASSERT_OK(cluster_->Restart()); |
| SleepFor(MonoDelta::FromSeconds(authn_token_validity_seconds_ + 1)); |
| |
| ASSERT_OK(InsertTestRows(client.get(), table.get(), |
| num_tablet_servers_, num_tablet_servers_ * 1)); |
| SleepFor(MonoDelta::FromSeconds(authn_token_validity_seconds_ + 1)); |
| // Make sure to insert a row into all tablets to make an RPC call to every |
| // tablet server hosting the table. |
| ASSERT_OK(InsertTestRows(client.get(), table.get(), |
| num_tablet_servers_, num_tablet_servers_ * 2)); |
| } |
| |
| struct AuthTokenParams { |
| int64_t authn_validity_secs; |
| int64_t authz_validity_secs; |
| }; |
| |
| constexpr AuthTokenParams kEvenValidity = { 5, 5 }; |
| constexpr AuthTokenParams kLongerAuthn = { 5, 3 }; |
| constexpr AuthTokenParams kLongerAuthz = { 3, 5 }; |
| |
| class AuthTokenExpireDuringWorkloadITest : public AuthTokenExpireITest, |
| public ::testing::WithParamInterface<AuthTokenParams> { |
| public: |
| AuthTokenExpireDuringWorkloadITest() |
| : AuthTokenExpireITest(GetParam().authn_validity_secs, GetParam().authz_validity_secs) { |
| // Close an already established idle connection to the server and open |
| // a new one upon making another call to the same server. This is to force |
| // authn token verification at every RPC. |
| FLAGS_rpc_reopen_outbound_connections = true; |
| } |
| |
| void SetUp() override { |
| AuthTokenExpireITestBase::SetUp(); |
| // Do not start the cluster as a part of setup phase. Don't waste time on |
| // on that because the scenario contains a test which is marked slow and |
| // will be skipped if KUDU_ALLOW_SLOW_TESTS environment variable is not set. |
| } |
| const int64_t max_token_validity = std::max(GetParam().authn_validity_secs, |
| GetParam().authz_validity_secs); |
| }; |
| |
| INSTANTIATE_TEST_CASE_P(ValidityIntervals, AuthTokenExpireDuringWorkloadITest, |
| ::testing::Values(kEvenValidity, kLongerAuthn, kLongerAuthz)); |
| |
| // Run a mixed write/read test workload and check that client retries upon |
| // receiving the appropriate invalid token error, eventually succeeding with |
| // every issued RPC. |
| TEST_P(AuthTokenExpireDuringWorkloadITest, InvalidTokenDuringMixedWorkload) { |
| static const int32_t kTimeoutMs = 10 * 60 * 1000; |
| |
| if (!AllowSlowTests()) { |
| LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run"; |
| return; |
| } |
| |
| ASSERT_OK(cluster_->Start()); |
| |
| TestWorkload w(cluster_.get()); |
| w.set_client_default_admin_operation_timeout_millis(kTimeoutMs); |
| w.set_client_default_rpc_timeout_millis(kTimeoutMs); |
| w.set_num_replicas(num_tablet_servers_); |
| w.set_num_read_threads(3); |
| w.set_read_timeout_millis(kTimeoutMs); |
| w.set_num_write_threads(3); |
| w.set_write_batch_size(64); |
| w.set_write_timeout_millis(kTimeoutMs); |
| w.set_write_pattern(TestWorkload::INSERT_SEQUENTIAL_ROWS); |
| |
| w.Setup(); |
| w.Start(); |
| SleepFor(MonoDelta::FromSeconds(8 * max_token_validity)); |
| w.StopAndJoin(); |
| |
| ClusterVerifier v(cluster_.get()); |
| v.SetOperationsTimeout(MonoDelta::FromSeconds(5 * 60)); |
| NO_FATALS(v.CheckRowCount( |
| w.table_name(), ClusterVerifier::EXACTLY, w.rows_inserted())); |
| ASSERT_OK(w.Cleanup()); |
| |
| NO_FATALS(cluster_->AssertNoCrashes()); |
| } |
| |
| // Run write-only and scan-only workloads and check that the client retries the |
| // appropriate invalid token error, eventually succeeding with its RPCs. There |
| // is also a test for the mixed workload (see above), but we are looking at the |
| // implementation as a black box: it's impossible to guarantee that the read |
| // paths are not affected by the write paths since the mixed workload uses the |
| // same shared client instance for both the read and the write paths. |
| TEST_P(AuthTokenExpireDuringWorkloadITest, InvalidTokenDuringSeparateWorkloads) { |
| const string table_name = "authn-token-expire-separate-workloads"; |
| static const int32_t kTimeoutMs = 10 * 60 * 1000; |
| |
| if (!AllowSlowTests()) { |
| LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run"; |
| return; |
| } |
| |
| ASSERT_OK(cluster_->Start()); |
| |
| // Close an already established idle connection to the server and open |
| // a new one upon making another call to the same server. This is to force |
| // authn token verification at every RPC. |
| FLAGS_rpc_reopen_outbound_connections = true; |
| |
| // Run the write-only workload first. |
| TestWorkload w(cluster_.get()); |
| w.set_table_name(table_name); |
| w.set_num_replicas(num_tablet_servers_); |
| w.set_client_default_admin_operation_timeout_millis(kTimeoutMs); |
| w.set_client_default_rpc_timeout_millis(kTimeoutMs); |
| w.set_num_replicas(num_tablet_servers_); |
| w.set_num_read_threads(0); |
| w.set_num_write_threads(8); |
| w.set_write_batch_size(256); |
| w.set_write_timeout_millis(kTimeoutMs); |
| w.set_write_pattern(TestWorkload::INSERT_SEQUENTIAL_ROWS); |
| w.Setup(); |
| w.Start(); |
| SleepFor(MonoDelta::FromSeconds(3 * max_token_validity)); |
| w.StopAndJoin(); |
| |
| NO_FATALS(cluster_->AssertNoCrashes()); |
| const int64_t rows_inserted = w.rows_inserted(); |
| ASSERT_GE(rows_inserted, 0); |
| |
| // Run the read-only workload after the test table is populated. |
| TestWorkload r(cluster_.get()); |
| r.set_table_name(table_name); |
| r.set_num_replicas(num_tablet_servers_); |
| r.set_client_default_admin_operation_timeout_millis(kTimeoutMs); |
| r.set_client_default_rpc_timeout_millis(kTimeoutMs); |
| r.set_num_read_threads(8); |
| r.set_read_timeout_millis(kTimeoutMs); |
| r.set_num_write_threads(0); |
| r.Setup(); |
| r.Start(); |
| SleepFor(MonoDelta::FromSeconds(3 * max_token_validity)); |
| r.StopAndJoin(); |
| |
| ClusterVerifier v(cluster_.get()); |
| v.SetOperationsTimeout(MonoDelta::FromSeconds(5 * 60)); |
| NO_FATALS(v.CheckRowCount(table_name, ClusterVerifier::EXACTLY, rows_inserted)); |
| ASSERT_OK(r.Cleanup()); |
| |
| NO_FATALS(cluster_->AssertNoCrashes()); |
| } |
| |
| // Scenarios to verify that the client automatically re-acquires authn token |
| // when receiving ERROR_INVALID_AUTHENTICATION_TOKEN from the servers in case |
| // if the client has established a token-based connection to masters. |
| // Note: this test doesn't rely on authz tokens, but the TSK validity period is |
| // determined based all token validity intervals, so for simplicity, set the |
| // authz validity interval to be the same. |
| class TokenBasedConnectionITest : public AuthTokenExpireITestBase { |
| public: |
| TokenBasedConnectionITest() |
| : AuthTokenExpireITestBase( |
| /*authn_token_validity_seconds=*/ 2, |
| /*authz_token_validity_seconds=*/ 2, |
| /*num_masters=*/ 1, |
| /*num_tablet_servers=*/ 3) { |
| cluster_opts_.extra_master_flags = { |
| Substitute("--authn_token_validity_seconds=$0", authn_token_validity_seconds_), |
| Substitute("--authz_token_validity_seconds=$0", authz_token_validity_seconds_), |
| }; |
| |
| cluster_opts_.extra_tserver_flags = { |
| // Decreasing TS->master heartbeat interval speeds up the test. |
| "--heartbeat_interval_ms=10", |
| }; |
| } |
| |
| void SetUp() override { |
| AuthTokenExpireITestBase::SetUp(); |
| ASSERT_OK(cluster_->Start()); |
| } |
| }; |
| |
| // This test verifies that the token re-acquire logic behaves correctly in case |
| // if the connection to the master is established using previously acquired |
| // authn token. The master has particular constraint to prohibit re-issuing |
| // a new authn token over a connection established with authn token itself |
| // (otherwise, an authn token would never effectively expire). |
| TEST_F(TokenBasedConnectionITest, ReacquireAuthnToken) { |
| const string table_name = "authn-token-reacquire"; |
| |
| // Create a client and perform some basic operations to acquire authn token. |
| shared_ptr<KuduClient> client; |
| ASSERT_OK(cluster_->CreateClient( |
| &KuduClientBuilder() |
| .default_admin_operation_timeout(MonoDelta::FromSeconds(60)) |
| .default_rpc_timeout(MonoDelta::FromSeconds(60)), |
| &client)); |
| ASSERT_OK(CreateTable(client.get(), table_name, schema_, num_tablet_servers_)); |
| |
| // Restart the master and the tablet servers to make sure all connectons |
| // between the client and the servers are closed. |
| cluster_->Shutdown(); |
| ASSERT_OK(cluster_->Restart()); |
| |
| // Perform some operations using already existing token. The crux here is to |
| // establish a connection to the master where client is authenticated |
| // via the authn token, not Kerberos credentials. |
| shared_ptr<KuduTable> table; |
| ASSERT_OK(client->OpenTable(table_name, &table)); |
| |
| // Let the authn token to expire. |
| SleepFor(MonoDelta::FromSeconds(authn_token_validity_seconds_ + 1)); |
| |
| // Here a new authn token should be automatically acquired upon receiving |
| // FATAL_INVALID_AUTHENTICATION_TOKEN error. To get a new token it's necessary |
| // to establish a new connection to the master _not_ using the authn token |
| // as client-side credentials. |
| ASSERT_OK(InsertTestRows(client.get(), table.get(), num_tablet_servers_)); |
| NO_FATALS(cluster_->AssertNoCrashes()); |
| } |
| |
| // Like the above test but testing the transaction system client and its access |
| // of the transaction status table. |
| TEST_F(TokenBasedConnectionITest, TxnSystemClientReacquireAuthnToken) { |
| SKIP_IF_SLOW_NOT_ALLOWED(); |
| unique_ptr<TxnSystemClient> txn_client; |
| ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), &txn_client)); |
| ASSERT_OK(txn_client->CreateTxnStatusTable(10)); |
| ASSERT_OK(txn_client->OpenTxnStatusTable()); |
| |
| // Reset all connections with the cluster. Since authn token validty is |
| // checked for new connections (but not for existing non-idle connections), |
| // this will ensure our token expiration is checked below. |
| cluster_->Shutdown(); |
| ASSERT_OK(cluster_->Restart()); |
| |
| // Wait for the initial authn token to expire and try to access the cluster. |
| // Try making a connection to the tablet server for the first time. It should |
| // automatically fetch a new token and succeed. |
| SleepFor(MonoDelta::FromSeconds(authn_token_validity_seconds_ + 1)); |
| ASSERT_OK(txn_client->BeginTransaction(1, "user")); |
| } |
| |
| // Test for scenarios involving multiple masters where |
| // client-to-non-leader-master connections are closed due to inactivity, |
| // but the connection to the former leader master is kept open. |
| class MultiMasterIdleConnectionsITest : public AuthTokenExpireITestBase { |
| public: |
| MultiMasterIdleConnectionsITest() |
| : AuthTokenExpireITestBase( |
| /*authn_token_validity_seconds=*/ 3, |
| /*authz_token_validity_seconds=*/ 3, |
| /*num_masters=*/ 3, |
| /*num_tablet_servers=*/ 3) { |
| |
| cluster_opts_.extra_master_flags = { |
| // Custom validity interval for authn tokens. The scenario involves |
| // expiration of authn tokens, while the default authn expiration timeout |
| // is 7 days. So, let's make the token validity interval really short. |
| Substitute("--authn_token_validity_seconds=$0", authn_token_validity_seconds_), |
| Substitute("--authz_token_validity_seconds=$0", authz_token_validity_seconds_), |
| |
| // The default for leader_failure_max_missed_heartbeat_periods 3.0, but |
| // 2.0 is enough to have master leadership stable enough and makes it |
| // run a bit faster. |
| Substitute("--leader_failure_max_missed_heartbeat_periods=$0", |
| master_leader_failure_max_missed_heartbeat_periods_), |
| |
| // Custom Raft heartbeat interval between replicas of the systable. |
| // The default it 500ms, but custom setting keeps the test stable enough |
| // while making it a bit faster to complete. |
| Substitute("--raft_heartbeat_interval_ms=$0", |
| master_raft_hb_interval_ms_), |
| |
| // The default is 65 seconds, but the test scenario need to run faster. |
| // A multiple of (leader_failure_max_missed_heartbeat_periods * |
| // raft_heartbeat_interval_ms) is good enough, but it's also necessary |
| // it to be greater than token validity interval due to the scenario's |
| // logic. |
| Substitute("--rpc_default_keepalive_time_ms=$0", |
| master_rpc_keepalive_time_ms_), |
| }; |
| |
| cluster_opts_.extra_tserver_flags = { |
| // Decreasing TS->master heartbeat interval speeds up the test. |
| "--heartbeat_interval_ms=100", |
| }; |
| } |
| |
| void SetUp() override { |
| AuthTokenExpireITestBase::SetUp(); |
| ASSERT_OK(cluster_->Start()); |
| } |
| |
| protected: |
| const int master_rpc_keepalive_time_ms_ = 3 * authn_token_validity_seconds_ * 1000 / 2; |
| const int master_raft_hb_interval_ms_ = 250; |
| const double master_leader_failure_max_missed_heartbeat_periods_ = 2.0; |
| }; |
| |
| // Verify that Kudu C++ client reacquires authn token in the following scenario: |
| // |
| // 1. Client is running against a multi-master cluster. |
| // 2. Client successfully authenticates and gets an authn token by calling |
| // ConnectToCluster. |
| // 3. Client keeps the connection to leader master open, but follower masters |
| // close connections to the client due to inactivity. |
| // 4. After the authn token expires, a change in master leadership happens. |
| // 5. Client tries to open a table, first making a request to the former |
| // leader master. However, the former leader returns NOT_THE_LEADER error. |
| // |
| // In that situation, the client should reconnect to the cluster to get a new |
| // authn token. Prior to the KUDU-2580 fix, it didn't, and the test was failing |
| // when the client tried to open the test table after master leader re-election: |
| // Timed out: GetTableSchema timed out after deadline expired |
| TEST_F(MultiMasterIdleConnectionsITest, ClientReacquiresAuthnToken) { |
| if (!AllowSlowTests()) { |
| LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run"; |
| return; |
| } |
| |
| const string kTableName = "keep-connection-to-former-master-leader"; |
| const auto time_start = MonoTime::Now(); |
| |
| shared_ptr<KuduClient> client; |
| { |
| KuduClientBuilder builder; |
| builder.default_rpc_timeout(MonoDelta::FromSeconds(5)); |
| ASSERT_OK(cluster_->CreateClient(&builder, &client)); |
| } |
| ASSERT_OK(CreateTable(client.get(), kTableName, schema_, num_tablet_servers_)); |
| |
| // Wait for the following events: |
| // 1) authn token expires |
| // 2) connections to non-leader masters close |
| |
| const auto time_right_before_token_expiration = time_start + |
| MonoDelta::FromSeconds(authn_token_validity_seconds_); |
| while (MonoTime::Now() < time_right_before_token_expiration) { |
| // Keep the connection to leader master open, time to time making requests |
| // that go to the leader master, but not to other masters in the cluster. |
| // |
| // The leader master might unexpectedly change in the middle of this cycle, |
| // but that would not induce errors in this cycle. The only negative outcome |
| // of that unexpected re-election would be missing the conditions of the |
| // reference scenario, but due to the relative stability of the Raft |
| // leadership role given current parameters for masters Raft consensus, |
| // the reference condition is reproduced in the vast majority of runs. |
| shared_ptr<KuduTable> table; |
| ASSERT_OK(client->OpenTable(kTableName, &table)); |
| SleepFor(MonoDelta::FromMilliseconds(250)); |
| } |
| |
| int former_leader_master_idx; |
| ASSERT_OK(cluster_->GetLeaderMasterIndex(&former_leader_master_idx)); |
| |
| // Given the relation between the master_rpc_keepalive_time_ms_ and |
| // authn_token_validity_seconds_ parameters, the original authn token should |
| // expire and connections to follower masters should be torn down due to |
| // inactivity, but the connection to the leader master should be kept open |
| // after waiting for additional token expiration interval. |
| SleepFor(MonoDelta::FromSeconds(authn_token_validity_seconds_)); |
| |
| ASSERT_EVENTUALLY([&] { |
| // The leadership could change behind the scenes, so if a new leader master |
| // is already around, another leadership change isn't needed. |
| int leader_idx; |
| ASSERT_OK(cluster_->GetLeaderMasterIndex(&leader_idx)); |
| if (former_leader_master_idx == leader_idx) { |
| // Make a request to the current leader master to step down, transferring |
| // the leadership role to other master. |
| consensus::ConsensusServiceProxy proxy( |
| cluster_->messenger(), cluster_->master(leader_idx)->bound_rpc_addr(), |
| cluster_->master(leader_idx)->bound_rpc_hostport().host()); |
| consensus::LeaderStepDownRequestPB req; |
| req.set_dest_uuid(cluster_->master(leader_idx)->uuid()); |
| req.set_tablet_id(master::SysCatalogTable::kSysCatalogTabletId); |
| req.set_mode(consensus::LeaderStepDownMode::GRACEFUL); |
| consensus::LeaderStepDownResponsePB resp; |
| rpc::RpcController rpc; |
| rpc.set_timeout(MonoDelta::FromSeconds(3)); |
| ASSERT_OK(proxy.LeaderStepDown(req, &resp, &rpc)); |
| |
| // Make sure the leader has actually changed. If not, the step-down |
| // request is retried until ASSERT_EVENTUALLY() times out. |
| int idx; |
| ASSERT_OK(cluster_->GetLeaderMasterIndex(&idx)); |
| ASSERT_NE(former_leader_master_idx, idx); |
| } |
| }); |
| |
| // Try to open the table after leader master re-election. The former leader |
| // responds with NOT_THE_LEADER error even if the authn token has expired |
| // (i.e. the client will not get FATAL_INVALID_AUTHENTICATION_TOKEN error). |
| // That's because the connection between the client and the former leader |
| // master was established in the beginning and kept open during the test. |
| // However, the client should detect that condition and reconnect to the |
| // cluster for a new authn token. |
| shared_ptr<KuduTable> table; |
| ASSERT_OK(client->OpenTable(kTableName, &table)); |
| } |
| |
| } // namespace client |
| } // namespace kudu |