[client] Fix a kudu c++ client bug when using replica_selection policy
At c++ client side, the replica_selection policy (LEADER_ONLY and
CLOSEST_REPLICA) is not working. Eg command: 'kudu perf table_scan
$master_list $table -columns=id,name -num_threads=4 -nofill_cache
-replica_selection="LEADER"', but the real replica_selection policy
is CLOSEST_REPLICA.
The patch fixes the bug in client library and adds unit tests.
Change-Id: I413f99b6a0b6082c5453358b8333913e4c6264c2
Reviewed-on: http://gerrit.cloudera.org:8080/18877
Reviewed-by: Yuqi Du <shenxingwuying@gmail.com>
Tested-by: Kudu Jenkins
Reviewed-by: Yifan Zhang <chinazhangyifan@163.com>
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 82f3934..8db6940 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -1063,6 +1063,7 @@
friend class tools::LeaderMasterProxy;
friend class tools::RemoteKsckCluster;
friend class tools::TableLister;
+ friend class ScanTokenTest;
FRIEND_TEST(kudu::ClientStressTest, TestUniqueClientIds);
FRIEND_TEST(kudu::MetaCacheLookupStressTest, PerfSynthetic);
diff --git a/src/kudu/client/scan_token-internal.cc b/src/kudu/client/scan_token-internal.cc
index 91360e6..ff16460 100644
--- a/src/kudu/client/scan_token-internal.cc
+++ b/src/kudu/client/scan_token-internal.cc
@@ -237,6 +237,23 @@
configuration->AddConjunctPredicate(std::move(*predicate));
}
+ switch (message.replica_selection()) {
+ case kudu::ReplicaSelection::LEADER_ONLY:
+ RETURN_NOT_OK_LOG(configuration->SetSelection(KuduClient::ReplicaSelection::LEADER_ONLY),
+ ERROR, "set replica selection LEADER_ONLY failed");
+ break;
+ case kudu::ReplicaSelection::CLOSEST_REPLICA:
+ RETURN_NOT_OK_LOG(configuration->SetSelection(KuduClient::ReplicaSelection::CLOSEST_REPLICA),
+ ERROR, "set replica selection CLOSEST_REPLICA failed");
+ break;
+ case kudu::ReplicaSelection::FIRST_REPLICA:
+ RETURN_NOT_OK_LOG(configuration->SetSelection(KuduClient::ReplicaSelection::FIRST_REPLICA),
+ ERROR, "set replica selection FIRST_REPLICA failed");
+ break;
+ default:
+ return Status::NotSupported("unsupported ReplicaSelection policy");
+ }
+
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
if (message.has_lower_bound_primary_key()) {
@@ -394,6 +411,19 @@
ColumnPredicateToPB(predicate_pair.second, pb.add_column_predicates());
}
+ switch (configuration_.selection_) {
+ case KuduClient::ReplicaSelection::LEADER_ONLY:
+ pb.set_replica_selection(kudu::ReplicaSelection::LEADER_ONLY);
+ break;
+ case KuduClient::ReplicaSelection::CLOSEST_REPLICA:
+ pb.set_replica_selection(kudu::ReplicaSelection::CLOSEST_REPLICA);
+ break;
+ case KuduClient::ReplicaSelection::FIRST_REPLICA:
+ pb.set_replica_selection(kudu::ReplicaSelection::FIRST_REPLICA);
+ break;
+ default:
+ return Status::InvalidArgument("replica_selection is invalid.");
+ }
const KuduScanner::ReadMode read_mode = configuration_.read_mode();
switch (read_mode) {
case KuduScanner::READ_LATEST:
diff --git a/src/kudu/client/scan_token-test.cc b/src/kudu/client/scan_token-test.cc
index fca2ccb..792faa6 100644
--- a/src/kudu/client/scan_token-test.cc
+++ b/src/kudu/client/scan_token-test.cc
@@ -18,9 +18,12 @@
#include <atomic>
#include <cstddef>
#include <cstdint>
+#include <functional>
+#include <map>
#include <memory>
#include <string>
#include <thread>
+#include <type_traits>
#include <unordered_set>
#include <utility>
#include <vector>
@@ -29,9 +32,11 @@
#include <glog/logging.h>
#include <gtest/gtest.h>
+#include "kudu/client/client-internal.h"
#include "kudu/client/client-test-util.h"
#include "kudu/client/client.h"
#include "kudu/client/client.pb.h"
+#include "kudu/client/meta_cache.h"
#include "kudu/client/scan_batch.h"
#include "kudu/client/scan_configuration.h"
#include "kudu/client/scan_predicate.h"
@@ -43,18 +48,22 @@
#include "kudu/common/common.pb.h"
#include "kudu/common/partial_row.h"
#include "kudu/common/wire_protocol.pb.h"
+#include "kudu/gutil/map-util.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/substitute.h"
+#include "kudu/integration-tests/test_workload.h"
#include "kudu/master/catalog_manager.h"
#include "kudu/master/master.h"
#include "kudu/master/mini_master.h"
#include "kudu/mini-cluster/internal_mini_cluster.h"
#include "kudu/tablet/tablet_replica.h"
#include "kudu/tserver/mini_tablet_server.h"
+#include "kudu/tserver/scanners.h"
#include "kudu/tserver/tablet_server.h"
#include "kudu/tserver/ts_tablet_manager.h"
#include "kudu/tserver/tserver.pb.h"
+#include "kudu/util/async_util.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/sockaddr.h"
@@ -76,6 +85,7 @@
using kudu::tablet::TabletReplica;
using kudu::tserver::MiniTabletServer;
using std::atomic;
+using std::map;
using std::string;
using std::thread;
using std::unique_ptr;
@@ -85,6 +95,9 @@
namespace kudu {
namespace client {
+static constexpr const int32_t kRecordCount = 1000;
+static constexpr const int32_t kBucketNum = 10;
+
class ScanTokenTest : public KuduTest {
protected:
void SetUp() override {
@@ -135,13 +148,16 @@
// Similar to CountRows() above, but use the specified client handle
// and run all the scanners sequentially, one by one.
- static Status CountRowsSeq(KuduClient* client,
- const vector<KuduScanToken*>& tokens,
- int64_t* row_count) {
+ static Status CountRowsSeq(
+ KuduClient* client,
+ const vector<KuduScanToken*>& tokens,
+ int64_t* row_count,
+ KuduClient::ReplicaSelection replica_selection = KuduClient::ReplicaSelection::LEADER_ONLY) {
int64_t count = 0;
for (auto* t : tokens) {
unique_ptr<KuduScanner> scanner;
RETURN_NOT_OK(IntoUniqueScanner(client, *t, &scanner));
+ RETURN_NOT_OK(scanner->SetSelection(replica_selection));
RETURN_NOT_OK(scanner->Open());
while (scanner->HasMoreRows()) {
KuduScanBatch batch;
@@ -246,6 +262,79 @@
return Status::OK();
}
+ void PrepareEnvForTestReplicaSelection(shared_ptr<KuduTable>* table, vector<string>* tablet_ids) {
+ constexpr const char* const kTableName = "replica_selection";
+ // Set up the mini cluster
+ InternalMiniClusterOptions options;
+ options.num_tablet_servers = 3;
+ cluster_.reset(new InternalMiniCluster(env_, options));
+ ASSERT_OK(cluster_->Start());
+ constexpr int kReplicationFactor = 3;
+
+ // Populate the table with data to scan later.
+ {
+ // Create a table with 10 partitions, 3 replication factor.
+ // and write some rows to make sure all partitions have data.
+ TestWorkload workload(cluster_.get(), TestWorkload::PartitioningType::HASH);
+ workload.set_table_name(kTableName);
+ workload.set_num_tablets(kBucketNum);
+ workload.set_num_replicas(kReplicationFactor);
+ workload.set_num_write_threads(10);
+ workload.set_write_batch_size(128);
+ workload.Setup();
+ workload.Start();
+ ASSERT_EVENTUALLY([&]() { ASSERT_GE(workload.rows_inserted(), kRecordCount); });
+ workload.StopAndJoin();
+ }
+ ASSERT_OK(cluster_->CreateClient(nullptr, &client_));
+ ASSERT_OK(client_->OpenTable(kTableName, table));
+ ASSERT_NE(nullptr, table->get());
+
+ vector<client::KuduScanToken*> tokens;
+ ElementDeleter deleter(&tokens);
+ client::KuduScanTokenBuilder builder(table->get());
+ ASSERT_OK(builder.Build(&tokens));
+
+ tablet_ids->clear();
+ tablet_ids->reserve(tokens.size());
+ for (const auto* token : tokens) {
+ tablet_ids->emplace_back(token->tablet().id());
+ }
+ }
+
+ void GetSelectedReplicaCount(const vector<string>& tablet_ids,
+ KuduClient::ReplicaSelection replication_selection,
+ map<string, int32_t>* replica_num_by_ts_uuid) {
+ for (const auto& tablet_id : tablet_ids) {
+ scoped_refptr<internal::RemoteTablet> rt;
+ Synchronizer sync;
+ client_->data_->meta_cache_->LookupTabletById(
+ client_.get(), tablet_id, MonoTime::Max(), &rt, sync.AsStatusCallback());
+ sync.Wait();
+ vector<internal::RemoteTabletServer*> tservers;
+ rt->GetRemoteTabletServers(&tservers);
+ ASSERT_EQ(3, tservers.size());
+
+ vector<internal::RemoteTabletServer*> candidates;
+ internal::RemoteTabletServer* tserver_picked;
+ ASSERT_OK(client_->data_->GetTabletServer(
+ client_.get(), rt, replication_selection, {}, &candidates, &tserver_picked));
+ auto& count = LookupOrInsert(replica_num_by_ts_uuid, tserver_picked->permanent_uuid(), 0);
+ count++;
+ }
+ }
+
+ void GetScannerCount(map<string, int32_t>* scanner_count_by_ts_uuid) {
+ scanner_count_by_ts_uuid->clear();
+ for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+ vector<tserver::ScanDescriptor> scanners =
+ cluster_->mini_tablet_server(i)->server()->scanner_manager()->ListScans();
+ scanner_count_by_ts_uuid->insert(
+ {cluster_->mini_tablet_server(i)->server()->instance_pb().permanent_uuid(),
+ static_cast<int32_t>(scanners.size())});
+ }
+ }
+
shared_ptr<KuduClient> client_;
unique_ptr<InternalMiniCluster> cluster_;
};
@@ -1475,5 +1564,50 @@
ASSERT_EQ(KuduScanner::READ_YOUR_WRITES, sc.read_mode());
}
+class ReplicaSelectionTest : public ScanTokenTest,
+ public ::testing::WithParamInterface<KuduClient::ReplicaSelection> {};
+
+INSTANTIATE_TEST_SUITE_P(PickServer,
+ ReplicaSelectionTest,
+ ::testing::Values(KuduClient::ReplicaSelection::LEADER_ONLY,
+ KuduClient::ReplicaSelection::CLOSEST_REPLICA,
+ KuduClient::ReplicaSelection::FIRST_REPLICA));
+
+// TODO(duyuqi)
+// Using location assignment to test replica selection for ScanToken, refer to:
+// src/kudu/integration-tests/location_assignment-itest.cc#L76-L150
+//
+// This unit test checks whether LEADER_ONLY/CLOSEST_REPLICA/FIRST_REPLICA replica selection works
+// as expected.
+TEST_P(ReplicaSelectionTest, ReplicaSelection) {
+ shared_ptr<KuduTable> table;
+ map<string, int32_t> replica_num_by_ts_uuid;
+ vector<string> tablet_ids;
+ auto replica_selection = GetParam();
+ PrepareEnvForTestReplicaSelection(&table, &tablet_ids);
+ GetSelectedReplicaCount(tablet_ids, replica_selection, &replica_num_by_ts_uuid);
+
+ map<string, int32_t> scanner_count_by_ts_uuid;
+ GetScannerCount(&scanner_count_by_ts_uuid);
+ vector<KuduScanToken*> tokens;
+ ElementDeleter deleter(&tokens);
+ // Scan all the partitions by specific replica selection.
+ // Launch scan requests.
+ ASSERT_OK(KuduScanTokenBuilder(table.get()).Build(&tokens));
+ int64_t row_count = 0;
+ CountRowsSeq(client_.get(), tokens, &row_count, replica_selection);
+
+ int result = 0;
+ map<string, int32_t> now_scanner_count_by_ts_uuid;
+ GetScannerCount(&now_scanner_count_by_ts_uuid);
+ for (auto& ts_uuid_scanner_count : now_scanner_count_by_ts_uuid) {
+ const auto& permanent_uuid = ts_uuid_scanner_count.first;
+ ASSERT_EQ(replica_num_by_ts_uuid[permanent_uuid],
+ (ts_uuid_scanner_count.second - scanner_count_by_ts_uuid[permanent_uuid]));
+ result += replica_num_by_ts_uuid[permanent_uuid];
+ }
+ ASSERT_EQ(kBucketNum, result);
+}
+
} // namespace client
} // namespace kudu
diff --git a/src/kudu/common/common.proto b/src/kudu/common/common.proto
index 8117b65..2779d85 100644
--- a/src/kudu/common/common.proto
+++ b/src/kudu/common/common.proto
@@ -312,6 +312,8 @@
// - Replicas whose tablet server has the same location as the client
// - All other replicas
CLOSEST_REPLICA = 2;
+ // Select the first replica in the list.
+ FIRST_REPLICA = 3;
}
// The serialized format of a Kudu table partition schema.
diff --git a/src/kudu/tools/table_scanner.cc b/src/kudu/tools/table_scanner.cc
index 5ed43d0..614cce8 100644
--- a/src/kudu/tools/table_scanner.cc
+++ b/src/kudu/tools/table_scanner.cc
@@ -170,11 +170,14 @@
}
constexpr const char* const kReplicaSelectionClosest = "closest";
+constexpr const char* const kReplicaSelectionFirst = "first";
constexpr const char* const kReplicaSelectionLeader = "leader";
+
bool ValidateReplicaSelection(const char* flag_name,
const string& flag_value) {
static const vector<string> kReplicaSelections = {
kReplicaSelectionClosest,
+ kReplicaSelectionFirst,
kReplicaSelectionLeader,
};
return IsFlagValueAcceptable(flag_name, flag_value, kReplicaSelections);
@@ -637,7 +640,7 @@
}
Status TableScanner::SetReplicaSelection(const string& selection_str) {
- KuduClient::ReplicaSelection selection;
+ KuduClient::ReplicaSelection selection = KuduClient::ReplicaSelection::CLOSEST_REPLICA;
RETURN_NOT_OK(ParseReplicaSelection(selection_str, &selection));
replica_selection_ = selection;
return Status::OK();
@@ -799,6 +802,8 @@
*selection = KuduClient::ReplicaSelection::CLOSEST_REPLICA;
} else if (iequals(kReplicaSelectionLeader, selection_str)) {
*selection = KuduClient::ReplicaSelection::LEADER_ONLY;
+ } else if (iequals(kReplicaSelectionFirst, selection_str)) {
+ *selection = KuduClient::ReplicaSelection::FIRST_REPLICA;
} else {
return Status::InvalidArgument("invalid replica selection", selection_str);
}