| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <cstdlib> |
| #include <memory> |
| #include <string> |
| #include <thread> |
| #include <vector> |
| |
| #include <gflags/gflags_declare.h> |
| #include <glog/logging.h> |
| #include <gtest/gtest.h> |
| |
| #include "kudu/client/client.h" |
| #include "kudu/client/schema.h" |
| #include "kudu/client/shared_ptr.h" // IWYU pragma: keep |
| #include "kudu/client/write_op.h" |
| #include "kudu/common/partial_row.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/mini-cluster/external_mini_cluster.h" |
| #include "kudu/tablet/key_value_test_schema.h" |
| #include "kudu/transactions/txn_system_client.h" |
| #include "kudu/util/monotime.h" |
| #include "kudu/util/scoped_cleanup.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/test_macros.h" |
| #include "kudu/util/test_util.h" |
| |
| using kudu::client::KuduClient; |
| using kudu::client::KuduClientBuilder; |
| using kudu::client::KuduInsert; |
| using kudu::client::KuduSession; |
| using kudu::client::KuduSchema; |
| using kudu::client::KuduTable; |
| using kudu::client::KuduTableCreator; |
| using kudu::client::sp::shared_ptr; |
| using kudu::cluster::ExternalMaster; |
| using kudu::cluster::ExternalMiniCluster; |
| using kudu::cluster::ExternalMiniClusterOptions; |
| using kudu::cluster::ExternalTabletServer; |
| using kudu::cluster::ScopedResumeExternalDaemon; |
| using kudu::transactions::TxnSystemClient; |
| using std::string; |
| using std::thread; |
| using std::unique_ptr; |
| using std::vector; |
| using strings::Substitute; |
| |
| DECLARE_bool(rpc_reopen_outbound_connections); |
| DECLARE_int64(rpc_negotiation_timeout_ms); |
| |
| namespace kudu { |
| |
| // Series of tests to verify that client fails over to another available server |
| // if it experiences a timeout on connection negotiation with current server. |
| // The 'server' can be a master or a tablet server. |
| class ClientFailoverOnNegotiationTimeoutITest : public KuduTest { |
| public: |
| ClientFailoverOnNegotiationTimeoutITest() { |
| // Since we want to catch timeout during connection negotiation phase, |
| // let's make the client re-establishing connections on every RPC. |
| FLAGS_rpc_reopen_outbound_connections = true; |
| |
| cluster_opts_.extra_tserver_flags = { |
| // Speed up Raft elections. |
| "--raft_heartbeat_interval_ms=25", |
| "--leader_failure_exp_backoff_max_delta_ms=1000", |
| // Decreasing TS->master heartbeat interval speeds up the test. |
| "--heartbeat_interval_ms=25", |
| "--enable_txn_system_client_init=true", |
| }; |
| cluster_opts_.extra_master_flags = { |
| // Speed up Raft elections. |
| "--raft_heartbeat_interval_ms=25", |
| "--leader_failure_exp_backoff_max_delta_ms=1000", |
| }; |
| } |
| |
| Status CreateAndStartCluster() { |
| cluster_.reset(new ExternalMiniCluster(cluster_opts_)); |
| return cluster_->Start(); |
| } |
| |
| shared_ptr<KuduClient> CreateClient( |
| const MonoDelta& rpc_timeout, |
| const MonoDelta& negotiation_timeout = {}) { |
| KuduClientBuilder b; |
| b.default_admin_operation_timeout(rpc_timeout); |
| b.default_rpc_timeout(rpc_timeout); |
| if (negotiation_timeout.Initialized()) { |
| b.connection_negotiation_timeout(negotiation_timeout); |
| } |
| shared_ptr<KuduClient> client; |
| CHECK_OK(cluster_->CreateClient(&b, &client)); |
| return client; |
| } |
| |
| void TearDown() override { |
| if (cluster_) { |
| cluster_->Shutdown(); |
| } |
| KuduTest::TearDown(); |
| } |
| |
| protected: |
| // Set the connection negotiation timeout shorter than its default value |
| // to run the test faster. For sanitizer builds we don't want the timeout |
| // to be too short: running the test concurrently with other activities |
| // might lead client to fail even if the client retries again and again. |
| #if defined(THREAD_SANITIZER) || defined(ADDRESS_SANITIZER) |
| const MonoDelta kNegotiationTimeout = MonoDelta::FromMilliseconds(3000); |
| #else |
| const MonoDelta kNegotiationTimeout = MonoDelta::FromMilliseconds(500); |
| #endif |
| ExternalMiniClusterOptions cluster_opts_; |
| shared_ptr<ExternalMiniCluster> cluster_; |
| }; |
| |
| // Regression test for KUDU-1580: if a client times out on negotiating a connection |
| // to a tablet server, it should retry with other available tablet server. |
| TEST_F(ClientFailoverOnNegotiationTimeoutITest, Kudu1580ConnectToTServer) { |
| static const int kNumTabletServers = 3; |
| static const int kTimeoutMs = 5 * 60 * 1000; |
| static const char* kTableName = "kudu1580"; |
| |
| SKIP_IF_SLOW_NOT_ALLOWED(); |
| |
| cluster_opts_.num_tablet_servers = kNumTabletServers; |
| ASSERT_OK(CreateAndStartCluster()); |
| |
| shared_ptr<KuduClient> client = CreateClient( |
| MonoDelta::FromMilliseconds(kTimeoutMs), kNegotiationTimeout); |
| ASSERT_NE(nullptr, client.get()); |
| unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator()); |
| KuduSchema schema(KuduSchema::FromSchema(CreateKeyValueTestSchema())); |
| ASSERT_OK(table_creator->table_name(kTableName) |
| .schema(&schema) |
| .add_hash_partitions({ "key" }, kNumTabletServers) |
| .num_replicas(kNumTabletServers) |
| .Create()); |
| shared_ptr<KuduTable> table; |
| ASSERT_OK(client->OpenTable(kTableName, &table)); |
| |
| shared_ptr<KuduSession> session = client->NewSession(); |
| session->SetTimeoutMillis(kTimeoutMs); |
| ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC)); |
| |
| // Running multiple iterations to cover possible variations of tablet leader |
| // placement among tablet servers. |
| for (int i = 0; i < 8 * kNumTabletServers; ++i) { |
| vector<unique_ptr<ScopedResumeExternalDaemon>> resumers; |
| for (int tsi = 0; tsi < kNumTabletServers; ++tsi) { |
| ExternalTabletServer* ts(cluster_->tablet_server(tsi)); |
| ASSERT_NE(nullptr, ts); |
| ASSERT_OK(ts->Pause()); |
| resumers.emplace_back(new ScopedResumeExternalDaemon(ts)); |
| } |
| |
| // Resume 2 out of 3 tablet servers (i.e. the majority), so the client |
| // could eventially succeed with its write operations. |
| thread resume_thread([&]() { |
| const int idx0 = rand() % kNumTabletServers; |
| unique_ptr<ScopedResumeExternalDaemon> r0(resumers[idx0].release()); |
| const int idx1 = (idx0 + 1) % kNumTabletServers; |
| unique_ptr<ScopedResumeExternalDaemon> r1(resumers[idx1].release()); |
| SleepFor(MonoDelta::FromSeconds(1)); |
| }); |
| // An automatic clean-up to handle both success and failure cases |
| // in the code below. |
| SCOPED_CLEANUP({ |
| resume_thread.join(); |
| }); |
| |
| // Since the table is hash-partitioned with kNumTabletServer partitions, |
| // hopefully three sequential numbers would go into different partitions. |
| for (int ii = 0; ii < kNumTabletServers; ++ii) { |
| unique_ptr<KuduInsert> ins(table->NewInsert()); |
| ASSERT_OK(ins->mutable_row()->SetInt32(0, kNumTabletServers * i + ii)); |
| ASSERT_OK(ins->mutable_row()->SetInt32(1, 0)); |
| ASSERT_OK(session->Apply(ins.release())); |
| } |
| } |
| } |
| |
| // Like the above test but testing the transaction system client. |
| TEST_F(ClientFailoverOnNegotiationTimeoutITest, TestTxnSystemClientRetryOnPause) { |
| SKIP_IF_SLOW_NOT_ALLOWED(); |
| static const int kNumTabletServers = 3; |
| cluster_opts_.num_tablet_servers = kNumTabletServers; |
| ASSERT_OK(CreateAndStartCluster()); |
| |
| unique_ptr<TxnSystemClient> txn_client; |
| ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), |
| cluster_->service_principal(), |
| &txn_client)); |
| ASSERT_OK(txn_client->CreateTxnStatusTable(100, kNumTabletServers)); |
| ASSERT_OK(txn_client->OpenTxnStatusTable()); |
| |
| vector<unique_ptr<ScopedResumeExternalDaemon>> resumers; |
| for (int i = 0; i < kNumTabletServers; i++) { |
| ExternalTabletServer* ts = cluster_->tablet_server(i); |
| ASSERT_OK(cluster_->tablet_server(i)->Pause()); |
| resumers.emplace_back(new ScopedResumeExternalDaemon(ts)); |
| } |
| |
| // Resume a random majority so the system client can proceed. |
| thread resume_thread([&]() { |
| const int idx0 = rand() % kNumTabletServers; |
| unique_ptr<ScopedResumeExternalDaemon> r0(resumers[idx0].release()); |
| const int idx1 = (idx0 + 1) % kNumTabletServers; |
| unique_ptr<ScopedResumeExternalDaemon> r1(resumers[idx1].release()); |
| SleepFor(MonoDelta::FromSeconds(1)); |
| }); |
| SCOPED_CLEANUP({ |
| resume_thread.join(); |
| }); |
| |
| for (int i = 1; i < 10; i++) { |
| ASSERT_OK(txn_client->BeginTransaction(i, "bob")); |
| } |
| } |
| |
| // Regression test for KUDU-2021: if client times out on establishing a |
| // connection to the leader master, it should retry with other master in case of |
| // a multi-master configuration. |
| TEST_F(ClientFailoverOnNegotiationTimeoutITest, Kudu2021ConnectToMaster) { |
| static const int kNumMasters = 3; |
| const MonoDelta kTimeout = MonoDelta::FromSeconds(60); |
| |
| cluster_opts_.num_masters = kNumMasters; |
| cluster_opts_.num_tablet_servers = 1; |
| ASSERT_OK(CreateAndStartCluster()); |
| |
| shared_ptr<KuduClient> client = CreateClient(kTimeout, kNegotiationTimeout); |
| ASSERT_NE(nullptr, client.get()); |
| |
| // Make a call to the master to populate the client's metadata cache. |
| vector<string> tables; |
| ASSERT_OK(client->ListTables(&tables)); |
| |
| // Pause the leader master so next call client would time out on connection |
| // negotiation. Do that few times. |
| for (int i = 0; i < kNumMasters; ++i) { |
| int leader_idx; |
| ASSERT_OK(cluster_->GetLeaderMasterIndex(&leader_idx)); |
| ASSERT_OK(cluster_->master(leader_idx)->Pause()); |
| ScopedResumeExternalDaemon resume_daemon(cluster_->master(leader_idx)); |
| |
| ASSERT_OK(client->ListTables(&tables)); |
| } |
| } |
| |
| // Regression test for KUDU-2021: if client times out on negotiating a |
| // connection with the master, it should retry with other master in case of |
| // a multi-master configuration. |
| TEST_F(ClientFailoverOnNegotiationTimeoutITest, Kudu2021NegotiateWithMaster) { |
| static const int kNumMasters = 3; |
| const MonoDelta kTimeout = MonoDelta::FromSeconds(60); |
| |
| cluster_opts_.num_masters = kNumMasters; |
| cluster_opts_.num_tablet_servers = 1; |
| ASSERT_OK(CreateAndStartCluster()); |
| |
| shared_ptr<KuduClient> client = CreateClient(kTimeout, kNegotiationTimeout); |
| ASSERT_NE(nullptr, client.get()); |
| |
| // Check client can successfully call ListTables(). |
| vector<string> tables; |
| ASSERT_OK(client->ListTables(&tables)); |
| |
| // The test sets the client-side RPC negotiation timeout via the flag |
| // 'rpc_negotiation_timeout_ms'. We want the client to open a TCP connection |
| // and start the negotiation process with the leader master and then time out |
| // during the negotiation process. For the test to check the client's behavior |
| // on timing out while establishing a TCP connection see the |
| // Kudu2021ConnectToMaster test above. |
| // |
| // So, after the client times out on the negotiation process, it should re-resolve |
| // the leader master and connect to the new leader. Since the former leader |
| // has been paused in the middle of the negotiation, the client is supposed to |
| // connect to the new leader and succeed with its ListTables() call. |
| int leader_idx; |
| ASSERT_OK(cluster_->GetLeaderMasterIndex(&leader_idx)); |
| ExternalMaster* m = cluster_->master(leader_idx); |
| ASSERT_OK( |
| cluster_->SetFlag(m, "rpc_negotiation_inject_delay_ms", |
| Substitute("$0", FLAGS_rpc_negotiation_timeout_ms * 2))); |
| thread pause_thread([&]() { |
| SleepFor(MonoDelta::FromMilliseconds(FLAGS_rpc_negotiation_timeout_ms / 2)); |
| CHECK_OK(m->Pause()); |
| }); |
| // An automatic clean-up to handle both success and failure cases. |
| SCOPED_CLEANUP({ |
| pause_thread.join(); |
| CHECK_OK(m->Resume()); |
| }); |
| |
| // After an attempt to negotiate with the former leader master, timing out, |
| // and re-resolving the leader master, the client will eventually connect to |
| // a new leader master elected after the former one is paused. The new leader |
| // master doesn't impose any negotiation delay, so the client should succeed |
| // with the ListTables() call. |
| ASSERT_OK(client->ListTables(&tables)); |
| } |
| |
| } // namespace kudu |