[client] Fix a kudu c++ client bug when using replica_selection policy

At c++ client side, the replica_selection policy (LEADER_ONLY and
CLOSEST_REPLICA) is not working. Eg command: 'kudu perf table_scan
$master_list $table -columns=id,name -num_threads=4 -nofill_cache
-replica_selection="LEADER"', but the real replica_selection policy
is CLOSEST_REPLICA.

The patch fixes the bug in client library and adds unit tests.

Change-Id: I413f99b6a0b6082c5453358b8333913e4c6264c2
Reviewed-on: http://gerrit.cloudera.org:8080/18877
Reviewed-by: Yuqi Du <shenxingwuying@gmail.com>
Tested-by: Kudu Jenkins
Reviewed-by: Yifan Zhang <chinazhangyifan@163.com>
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 82f3934..8db6940 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -1063,6 +1063,7 @@
   friend class tools::LeaderMasterProxy;
   friend class tools::RemoteKsckCluster;
   friend class tools::TableLister;
+  friend class ScanTokenTest;
 
   FRIEND_TEST(kudu::ClientStressTest, TestUniqueClientIds);
   FRIEND_TEST(kudu::MetaCacheLookupStressTest, PerfSynthetic);
diff --git a/src/kudu/client/scan_token-internal.cc b/src/kudu/client/scan_token-internal.cc
index 91360e6..ff16460 100644
--- a/src/kudu/client/scan_token-internal.cc
+++ b/src/kudu/client/scan_token-internal.cc
@@ -237,6 +237,23 @@
     configuration->AddConjunctPredicate(std::move(*predicate));
   }
 
+  switch (message.replica_selection()) {
+    case kudu::ReplicaSelection::LEADER_ONLY:
+      RETURN_NOT_OK_LOG(configuration->SetSelection(KuduClient::ReplicaSelection::LEADER_ONLY),
+                        ERROR, "set replica selection LEADER_ONLY failed");
+      break;
+    case kudu::ReplicaSelection::CLOSEST_REPLICA:
+      RETURN_NOT_OK_LOG(configuration->SetSelection(KuduClient::ReplicaSelection::CLOSEST_REPLICA),
+                        ERROR, "set replica selection CLOSEST_REPLICA failed");
+      break;
+    case kudu::ReplicaSelection::FIRST_REPLICA:
+      RETURN_NOT_OK_LOG(configuration->SetSelection(KuduClient::ReplicaSelection::FIRST_REPLICA),
+                        ERROR, "set replica selection FIRST_REPLICA failed");
+      break;
+    default:
+      return Status::NotSupported("unsupported ReplicaSelection policy");
+  }
+
 #pragma GCC diagnostic push
 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
   if (message.has_lower_bound_primary_key()) {
@@ -394,6 +411,19 @@
     ColumnPredicateToPB(predicate_pair.second, pb.add_column_predicates());
   }
 
+  switch (configuration_.selection_) {
+    case KuduClient::ReplicaSelection::LEADER_ONLY:
+      pb.set_replica_selection(kudu::ReplicaSelection::LEADER_ONLY);
+      break;
+    case KuduClient::ReplicaSelection::CLOSEST_REPLICA:
+      pb.set_replica_selection(kudu::ReplicaSelection::CLOSEST_REPLICA);
+      break;
+    case KuduClient::ReplicaSelection::FIRST_REPLICA:
+      pb.set_replica_selection(kudu::ReplicaSelection::FIRST_REPLICA);
+      break;
+    default:
+      return Status::InvalidArgument("replica_selection is invalid.");
+  }
   const KuduScanner::ReadMode read_mode = configuration_.read_mode();
   switch (read_mode) {
     case KuduScanner::READ_LATEST:
diff --git a/src/kudu/client/scan_token-test.cc b/src/kudu/client/scan_token-test.cc
index fca2ccb..792faa6 100644
--- a/src/kudu/client/scan_token-test.cc
+++ b/src/kudu/client/scan_token-test.cc
@@ -18,9 +18,12 @@
 #include <atomic>
 #include <cstddef>
 #include <cstdint>
+#include <functional>
+#include <map>
 #include <memory>
 #include <string>
 #include <thread>
+#include <type_traits>
 #include <unordered_set>
 #include <utility>
 #include <vector>
@@ -29,9 +32,11 @@
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
+#include "kudu/client/client-internal.h"
 #include "kudu/client/client-test-util.h"
 #include "kudu/client/client.h"
 #include "kudu/client/client.pb.h"
+#include "kudu/client/meta_cache.h"
 #include "kudu/client/scan_batch.h"
 #include "kudu/client/scan_configuration.h"
 #include "kudu/client/scan_predicate.h"
@@ -43,18 +48,22 @@
 #include "kudu/common/common.pb.h"
 #include "kudu/common/partial_row.h"
 #include "kudu/common/wire_protocol.pb.h"
+#include "kudu/gutil/map-util.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/integration-tests/test_workload.h"
 #include "kudu/master/catalog_manager.h"
 #include "kudu/master/master.h"
 #include "kudu/master/mini_master.h"
 #include "kudu/mini-cluster/internal_mini_cluster.h"
 #include "kudu/tablet/tablet_replica.h"
 #include "kudu/tserver/mini_tablet_server.h"
+#include "kudu/tserver/scanners.h"
 #include "kudu/tserver/tablet_server.h"
 #include "kudu/tserver/ts_tablet_manager.h"
 #include "kudu/tserver/tserver.pb.h"
+#include "kudu/util/async_util.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/sockaddr.h"
@@ -76,6 +85,7 @@
 using kudu::tablet::TabletReplica;
 using kudu::tserver::MiniTabletServer;
 using std::atomic;
+using std::map;
 using std::string;
 using std::thread;
 using std::unique_ptr;
@@ -85,6 +95,9 @@
 namespace kudu {
 namespace client {
 
+static constexpr const int32_t kRecordCount = 1000;
+static constexpr const int32_t kBucketNum = 10;
+
 class ScanTokenTest : public KuduTest {
  protected:
   void SetUp() override {
@@ -135,13 +148,16 @@
 
   // Similar to CountRows() above, but use the specified client handle
   // and run all the scanners sequentially, one by one.
-  static Status CountRowsSeq(KuduClient* client,
-                             const vector<KuduScanToken*>& tokens,
-                             int64_t* row_count) {
+  static Status CountRowsSeq(
+      KuduClient* client,
+      const vector<KuduScanToken*>& tokens,
+      int64_t* row_count,
+      KuduClient::ReplicaSelection replica_selection = KuduClient::ReplicaSelection::LEADER_ONLY) {
     int64_t count = 0;
     for (auto* t : tokens) {
       unique_ptr<KuduScanner> scanner;
       RETURN_NOT_OK(IntoUniqueScanner(client, *t, &scanner));
+      RETURN_NOT_OK(scanner->SetSelection(replica_selection));
       RETURN_NOT_OK(scanner->Open());
       while (scanner->HasMoreRows()) {
         KuduScanBatch batch;
@@ -246,6 +262,79 @@
     return Status::OK();
   }
 
+  void PrepareEnvForTestReplicaSelection(shared_ptr<KuduTable>* table, vector<string>* tablet_ids) {
+    constexpr const char* const kTableName = "replica_selection";
+    // Set up the mini cluster
+    InternalMiniClusterOptions options;
+    options.num_tablet_servers = 3;
+    cluster_.reset(new InternalMiniCluster(env_, options));
+    ASSERT_OK(cluster_->Start());
+    constexpr int kReplicationFactor = 3;
+
+    // Populate the table with data to scan later.
+    {
+      // Create a table with 10 partitions, 3 replication factor.
+      // and write some rows to make sure all partitions have data.
+      TestWorkload workload(cluster_.get(), TestWorkload::PartitioningType::HASH);
+      workload.set_table_name(kTableName);
+      workload.set_num_tablets(kBucketNum);
+      workload.set_num_replicas(kReplicationFactor);
+      workload.set_num_write_threads(10);
+      workload.set_write_batch_size(128);
+      workload.Setup();
+      workload.Start();
+      ASSERT_EVENTUALLY([&]() { ASSERT_GE(workload.rows_inserted(), kRecordCount); });
+      workload.StopAndJoin();
+    }
+    ASSERT_OK(cluster_->CreateClient(nullptr, &client_));
+    ASSERT_OK(client_->OpenTable(kTableName, table));
+    ASSERT_NE(nullptr, table->get());
+
+    vector<client::KuduScanToken*> tokens;
+    ElementDeleter deleter(&tokens);
+    client::KuduScanTokenBuilder builder(table->get());
+    ASSERT_OK(builder.Build(&tokens));
+
+    tablet_ids->clear();
+    tablet_ids->reserve(tokens.size());
+    for (const auto* token : tokens) {
+      tablet_ids->emplace_back(token->tablet().id());
+    }
+  }
+
+  void GetSelectedReplicaCount(const vector<string>& tablet_ids,
+                               KuduClient::ReplicaSelection replication_selection,
+                               map<string, int32_t>* replica_num_by_ts_uuid) {
+    for (const auto& tablet_id : tablet_ids) {
+      scoped_refptr<internal::RemoteTablet> rt;
+      Synchronizer sync;
+      client_->data_->meta_cache_->LookupTabletById(
+          client_.get(), tablet_id, MonoTime::Max(), &rt, sync.AsStatusCallback());
+      sync.Wait();
+      vector<internal::RemoteTabletServer*> tservers;
+      rt->GetRemoteTabletServers(&tservers);
+      ASSERT_EQ(3, tservers.size());
+
+      vector<internal::RemoteTabletServer*> candidates;
+      internal::RemoteTabletServer* tserver_picked;
+      ASSERT_OK(client_->data_->GetTabletServer(
+          client_.get(), rt, replication_selection, {}, &candidates, &tserver_picked));
+      auto& count = LookupOrInsert(replica_num_by_ts_uuid, tserver_picked->permanent_uuid(), 0);
+      count++;
+    }
+  }
+
+  void GetScannerCount(map<string, int32_t>* scanner_count_by_ts_uuid) {
+    scanner_count_by_ts_uuid->clear();
+    for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+      vector<tserver::ScanDescriptor> scanners =
+          cluster_->mini_tablet_server(i)->server()->scanner_manager()->ListScans();
+      scanner_count_by_ts_uuid->insert(
+          {cluster_->mini_tablet_server(i)->server()->instance_pb().permanent_uuid(),
+           static_cast<int32_t>(scanners.size())});
+    }
+  }
+
   shared_ptr<KuduClient> client_;
   unique_ptr<InternalMiniCluster> cluster_;
 };
@@ -1475,5 +1564,50 @@
   ASSERT_EQ(KuduScanner::READ_YOUR_WRITES, sc.read_mode());
 }
 
+class ReplicaSelectionTest : public ScanTokenTest,
+                             public ::testing::WithParamInterface<KuduClient::ReplicaSelection> {};
+
+INSTANTIATE_TEST_SUITE_P(PickServer,
+                         ReplicaSelectionTest,
+                         ::testing::Values(KuduClient::ReplicaSelection::LEADER_ONLY,
+                                           KuduClient::ReplicaSelection::CLOSEST_REPLICA,
+                                           KuduClient::ReplicaSelection::FIRST_REPLICA));
+
+// TODO(duyuqi)
+// Using location assignment to test replica selection for ScanToken, refer to:
+//   src/kudu/integration-tests/location_assignment-itest.cc#L76-L150
+//
+// This unit test checks whether LEADER_ONLY/CLOSEST_REPLICA/FIRST_REPLICA replica selection works
+// as expected.
+TEST_P(ReplicaSelectionTest, ReplicaSelection) {
+  shared_ptr<KuduTable> table;
+  map<string, int32_t> replica_num_by_ts_uuid;
+  vector<string> tablet_ids;
+  auto replica_selection = GetParam();
+  PrepareEnvForTestReplicaSelection(&table, &tablet_ids);
+  GetSelectedReplicaCount(tablet_ids, replica_selection, &replica_num_by_ts_uuid);
+
+  map<string, int32_t> scanner_count_by_ts_uuid;
+  GetScannerCount(&scanner_count_by_ts_uuid);
+  vector<KuduScanToken*> tokens;
+  ElementDeleter deleter(&tokens);
+  // Scan all the partitions by specific replica selection.
+  // Launch scan requests.
+  ASSERT_OK(KuduScanTokenBuilder(table.get()).Build(&tokens));
+  int64_t row_count = 0;
+  CountRowsSeq(client_.get(), tokens, &row_count, replica_selection);
+
+  int result = 0;
+  map<string, int32_t> now_scanner_count_by_ts_uuid;
+  GetScannerCount(&now_scanner_count_by_ts_uuid);
+  for (auto& ts_uuid_scanner_count : now_scanner_count_by_ts_uuid) {
+    const auto& permanent_uuid = ts_uuid_scanner_count.first;
+    ASSERT_EQ(replica_num_by_ts_uuid[permanent_uuid],
+              (ts_uuid_scanner_count.second - scanner_count_by_ts_uuid[permanent_uuid]));
+    result += replica_num_by_ts_uuid[permanent_uuid];
+  }
+  ASSERT_EQ(kBucketNum, result);
+}
+
 } // namespace client
 } // namespace kudu
diff --git a/src/kudu/common/common.proto b/src/kudu/common/common.proto
index 8117b65..2779d85 100644
--- a/src/kudu/common/common.proto
+++ b/src/kudu/common/common.proto
@@ -312,6 +312,8 @@
   //   - Replicas whose tablet server has the same location as the client
   //   - All other replicas
   CLOSEST_REPLICA = 2;
+  // Select the first replica in the list.
+  FIRST_REPLICA = 3;
 }
 
 // The serialized format of a Kudu table partition schema.
diff --git a/src/kudu/tools/table_scanner.cc b/src/kudu/tools/table_scanner.cc
index 5ed43d0..614cce8 100644
--- a/src/kudu/tools/table_scanner.cc
+++ b/src/kudu/tools/table_scanner.cc
@@ -170,11 +170,14 @@
 }
 
 constexpr const char* const kReplicaSelectionClosest = "closest";
+constexpr const char* const kReplicaSelectionFirst = "first";
 constexpr const char* const kReplicaSelectionLeader = "leader";
+
 bool ValidateReplicaSelection(const char* flag_name,
                               const string& flag_value) {
   static const vector<string> kReplicaSelections = {
     kReplicaSelectionClosest,
+    kReplicaSelectionFirst,
     kReplicaSelectionLeader,
   };
   return IsFlagValueAcceptable(flag_name, flag_value, kReplicaSelections);
@@ -637,7 +640,7 @@
 }
 
 Status TableScanner::SetReplicaSelection(const string& selection_str) {
-  KuduClient::ReplicaSelection selection;
+  KuduClient::ReplicaSelection selection = KuduClient::ReplicaSelection::CLOSEST_REPLICA;
   RETURN_NOT_OK(ParseReplicaSelection(selection_str, &selection));
   replica_selection_ = selection;
   return Status::OK();
@@ -799,6 +802,8 @@
     *selection = KuduClient::ReplicaSelection::CLOSEST_REPLICA;
   } else if (iequals(kReplicaSelectionLeader, selection_str)) {
     *selection = KuduClient::ReplicaSelection::LEADER_ONLY;
+  } else if (iequals(kReplicaSelectionFirst, selection_str)) {
+    *selection = KuduClient::ReplicaSelection::FIRST_REPLICA;
   } else {
     return Status::InvalidArgument("invalid replica selection", selection_str);
   }