KUDU-3393  C++ client support split a tablet to mutil ranges and concurrent scan data

I add a param for build 'KuduScanToken', like this :
`
KuduScanTokenBuilder builder(table);
vector<KuduScanToken*> tokens;
ElementDeleter deleter(&tokens);
// set splitSizeBytes
builder.SetSplitSizeBytes(1000);
ASSERT_OK(builder.Build(&tokens));
`
The default value of split_size_bytes is 0, and this means we don't split the key range
for a tablet.
If the value of split_size_bytes is nonzero, we will try to send a SplitKeyRangeRPC to
tservers. We may get more tokens than tablets num and the more tokens will help us scan faster.

Change-Id: I207f9584cd558d32fcd9e8de7d6c25e517377272
Reviewed-on: http://gerrit.cloudera.org:8080/18945
Tested-by: Kudu Jenkins
Reviewed-by: Yingchun Lai <acelyc1112009@gmail.com>
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index b7d5412..492a6e9 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -141,6 +141,7 @@
 DECLARE_bool(catalog_manager_support_live_row_count);
 DECLARE_bool(catalog_manager_support_on_disk_size);
 DECLARE_bool(client_use_unix_domain_sockets);
+DECLARE_bool(enable_rowset_compaction);
 DECLARE_bool(enable_txn_system_client_init);
 DECLARE_bool(fail_dns_resolution);
 DECLARE_bool(location_mapping_by_uuid);
@@ -217,6 +218,7 @@
 using kudu::tablet::TabletReplica;
 using kudu::transactions::TxnTokenPB;
 using kudu::tserver::MiniTabletServer;
+using std::atomic;
 using std::function;
 using std::map;
 using std::nullopt;
@@ -400,6 +402,68 @@
     }
   }
 
+  void CheckTokensInfo(const vector<KuduScanToken*>& tokens,
+                       int replica_num = 1) {
+    for (const auto* t : tokens) {
+      const KuduTablet& tablet = t->tablet();
+      ASSERT_EQ(replica_num, tablet.replicas().size());
+      const KuduReplica* replica = tablet.replicas()[0];
+      ASSERT_TRUE(replica->is_leader());
+      const MiniTabletServer* ts = cluster_->mini_tablet_server(0);
+      ASSERT_EQ(ts->server()->instance_pb().permanent_uuid(),
+          replica->ts().uuid());
+      ASSERT_EQ(ts->bound_rpc_addr().host(), replica->ts().hostname());
+      ASSERT_EQ(ts->bound_rpc_addr().port(), replica->ts().port());
+
+      unique_ptr<KuduTablet> tablet_copy;
+      {
+        KuduTablet* ptr;
+        ASSERT_OK(client_->GetTablet(tablet.id(), &ptr));
+        tablet_copy.reset(ptr);
+      }
+      ASSERT_EQ(tablet.id(), tablet_copy->id());
+      ASSERT_EQ(1, tablet_copy->replicas().size());
+      const KuduReplica* replica_copy = tablet_copy->replicas()[0];
+
+      ASSERT_EQ(replica->is_leader(), replica_copy->is_leader());
+      ASSERT_EQ(replica->ts().uuid(), replica_copy->ts().uuid());
+      ASSERT_EQ(replica->ts().hostname(), replica_copy->ts().hostname());
+      ASSERT_EQ(replica->ts().port(), replica_copy->ts().port());
+    }
+  }
+
+  int CountRows(const vector<KuduScanToken*>& tokens) {
+    atomic<uint32_t> rows(0);
+    vector<thread> threads;
+    for (KuduScanToken* token : tokens) {
+      string buf;
+      CHECK_OK(token->Serialize(&buf));
+
+      threads.emplace_back([this, &rows] (const string& serialized_token) {
+        shared_ptr<KuduClient> client;
+        ASSERT_OK(cluster_->CreateClient(nullptr, &client));
+        KuduScanner* scanner_ptr;
+        ASSERT_OK(KuduScanToken::DeserializeIntoScanner(
+            client.get(), serialized_token, &scanner_ptr));
+        unique_ptr<KuduScanner> scanner(scanner_ptr);
+        ASSERT_OK(scanner->Open());
+
+        while (scanner->HasMoreRows()) {
+          KuduScanBatch batch;
+          ASSERT_OK(scanner->NextBatch(&batch));
+          rows += batch.NumRows();
+        }
+        scanner->Close();
+      }, std::move(buf));
+    }
+
+    for (thread& thread : threads) {
+      thread.join();
+    }
+
+    return rows;
+  }
+
   // Return the number of lookup-related RPCs which have been serviced by the master.
   int CountMasterLookupRPCs() const {
     auto ent = cluster_->mini_master()->master()->metric_entity();
@@ -8539,6 +8603,171 @@
   }
 }
 
+class TableKeyRangeTest : public ClientTest {
+ public:
+   Status BuildSchema() override {
+    KuduSchemaBuilder b;
+    b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey();
+    b.AddColumn("int_val")->Type(KuduColumnSchema::INT32)->NotNull();
+    b.AddColumn("string_val")->Type(KuduColumnSchema::STRING)->Nullable();
+    return b.Build(&schema_);
+  }
+
+  void SetUp() override {
+    ClientTest::SetUp();
+
+    // Set a low flush threshold so we can scan a mix of flushed data in
+    // in-memory data.
+    FLAGS_flush_threshold_mb = 0;
+    FLAGS_flush_threshold_secs = 1;
+
+    // Disable rowset compact to prevent DRSs being merged because they are too small.
+    FLAGS_enable_rowset_compaction = false;
+
+    ASSERT_OK(CreateTable(kTableName, 1, {}, GeneratePartialRows(), &client_table_));
+  }
+
+  typedef vector<pair<unique_ptr<KuduPartialRow>, unique_ptr<KuduPartialRow>>> KuduPartialRowsVec;
+  KuduPartialRowsVec GeneratePartialRows() const {
+    KuduPartialRowsVec rows;
+    vector<int> keys = { 0, 250, 500, 750 };
+    for (int i = 0; i < keys.size(); i++) {
+      unique_ptr<KuduPartialRow> lower_bound(schema_.NewRow());
+      CHECK_OK(lower_bound->SetInt32("key", keys[i]));
+      unique_ptr<KuduPartialRow> upper_bound(schema_.NewRow());
+      CHECK_OK(upper_bound->SetInt32("key", keys[i] + 250));
+      rows.emplace_back(lower_bound.release(), upper_bound.release());
+    }
+
+    return rows;
+  }
+
+  static void InsertTestRowsWithStrings(KuduTable* table, KuduSession* session, int num_rows) {
+    vector<int> keys = { 0, 250, 500, 750 };
+    string str_val = "*";
+    int diff_value = 120; // use to create discontinuous data in a tablet
+    for (int k = 0; k < keys.size(); k++) {
+      for (int i = keys[k]; i < keys[k] + num_rows; i++) {
+        unique_ptr<KuduInsert> insert(table->NewInsert());
+        ASSERT_OK(insert->mutable_row()->SetInt32("key", i));
+        ASSERT_OK(insert->mutable_row()->SetInt32("int_val", i * 2));
+        ASSERT_OK(insert->mutable_row()->SetString("string_val", str_val));
+        ASSERT_OK(session->Apply(insert.release()));
+        ASSERT_OK(session->Flush());
+      }
+      for (int i = keys[k] + diff_value; i < keys[k] + diff_value + num_rows; i++) {
+        unique_ptr<KuduInsert> insert(table->NewInsert());
+        ASSERT_OK(insert->mutable_row()->SetInt32("key", i));
+        ASSERT_OK(insert->mutable_row()->SetInt32("int_val", i * 2));
+        ASSERT_OK(insert->mutable_row()->SetString("string_val", str_val));
+        ASSERT_OK(session->Apply(insert.release()));
+        ASSERT_OK(session->Flush());
+      }
+    }
+  }
+
+ protected:
+  static constexpr const char* const kTableName = "client-testrange";
+
+  shared_ptr<KuduTable> range_table_;
+};
+
+TEST_F(TableKeyRangeTest, TestGetTableKeyRange) {
+  client::sp::shared_ptr<KuduTable> table;
+  ASSERT_OK(client_->OpenTable(kTableName, &table));
+  {
+    // Create session
+    shared_ptr<KuduSession> session = client_->NewSession();
+    session->SetTimeoutMillis(10000);
+    ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
+
+    // Should have no rows to begin with.
+    ASSERT_EQ(0, CountRowsFromClient(table.get()));
+    // Insert rows
+    NO_FATALS(InsertTestRowsWithStrings(client_table_.get(), session.get(), 100));
+    NO_FATALS(CheckNoRpcOverflow());
+  }
+
+  {
+    // search meta cache by default
+    //
+    // There are tablet information in the meta cache.
+    // We give priority to the data in the cache by default.
+    KuduScanTokenBuilder builder(table.get());
+    vector<KuduScanToken*> tokens;
+    ElementDeleter deleter(&tokens);
+    ASSERT_OK(builder.Build(&tokens));
+    ASSERT_EQ(4, tokens.size());
+
+    NO_FATALS(CheckTokensInfo(tokens));
+    ASSERT_EQ(800, CountRows(tokens));
+  }
+
+  {
+    // search meta cache by local
+    //
+    // If the splitSizeBytes set to 0 , we search the meta cache.
+    KuduScanTokenBuilder builder(table.get());
+    vector<KuduScanToken*> tokens;
+    ElementDeleter deleter(&tokens);
+    // set splitSizeBytes to 0
+    builder.SetSplitSizeBytes(0);
+    ASSERT_OK(builder.Build(&tokens));
+    ASSERT_EQ(4, tokens.size());
+
+    NO_FATALS(CheckTokensInfo(tokens));
+    ASSERT_EQ(800, CountRows(tokens));
+  }
+
+  uint32_t token_size_a = 0;
+  {
+    // search from tserver
+    KuduScanTokenBuilder builder(table.get());
+    vector<KuduScanToken*> tokens;
+    ElementDeleter deleter(&tokens);
+    // set splitSizeBytes < tablet's size
+    builder.SetSplitSizeBytes(700);
+    ASSERT_OK(builder.Build(&tokens));
+    token_size_a = tokens.size();
+    ASSERT_LT(4, token_size_a);
+
+    NO_FATALS(CheckTokensInfo(tokens));
+    ASSERT_EQ(800, CountRows(tokens));
+  }
+
+  uint32_t token_size_b = 0;
+  {
+    // search from tserver
+    KuduScanTokenBuilder builder(table.get());
+    vector<KuduScanToken*> tokens;
+    ElementDeleter deleter(&tokens);
+    // set splitSizeBytes < tablet's size
+    builder.SetSplitSizeBytes(20);
+    ASSERT_OK(builder.Build(&tokens));
+    token_size_b = tokens.size();
+    ASSERT_LT(4, token_size_b);
+
+    NO_FATALS(CheckTokensInfo(tokens));
+    ASSERT_EQ(800, CountRows(tokens));
+  }
+
+  // diffferent splitSizeBytes leads to different token
+  ASSERT_NE(token_size_a, token_size_b);
+
+  {
+    // search from tserver
+    KuduScanTokenBuilder builder(table.get());
+    vector<KuduScanToken*> tokens;
+    ElementDeleter deleter(&tokens);
+    // set splitSizeBytes > tablet's size
+    builder.SetSplitSizeBytes(1024 * 1024 * 1024);
+    ASSERT_OK(builder.Build(&tokens));
+    ASSERT_EQ(tokens.size(), 4);
+
+    NO_FATALS(CheckTokensInfo(tokens));
+    ASSERT_EQ(800, CountRows(tokens));
+  }
+}
 class ClientTxnManagerProxyTest : public ClientTest {
  public:
   void SetUp() override {
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index f47f0ef..818bb27 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -2306,6 +2306,10 @@
   return data_->Build(tokens);
 }
 
+void KuduScanTokenBuilder::SetSplitSizeBytes(uint64_t split_size_bytes) {
+  return data_->SplitSizeBytes(split_size_bytes);
+}
+
 ////////////////////////////////////////////////////////////
 // KuduReplica
 ////////////////////////////////////////////////////////////
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 834d77d..82f3934 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -3358,6 +3358,11 @@
   /// @return Operation result status.
   Status Build(std::vector<KuduScanToken*>* tokens) WARN_UNUSED_RESULT;
 
+  /// Set the size of the data in each key range.
+  /// The default value is 0 without set and tokens build by meta cache.
+  /// It's corresponding to 'setSplitSizeBytes' in Java client.
+  void SetSplitSizeBytes(uint64_t split_size_bytes);
+
  private:
   class KUDU_NO_EXPORT Data;
 
diff --git a/src/kudu/client/meta_cache.cc b/src/kudu/client/meta_cache.cc
index 948de02..f47b1fd 100644
--- a/src/kudu/client/meta_cache.cc
+++ b/src/kudu/client/meta_cache.cc
@@ -24,6 +24,7 @@
 #include <ostream>
 #include <set>
 #include <string>
+#include <type_traits>
 #include <utility>
 #include <vector>
 
@@ -36,6 +37,7 @@
 #include "kudu/client/master_proxy_rpc.h"
 #include "kudu/client/schema.h"
 #include "kudu/common/common.pb.h"
+#include "kudu/common/key_range.h"
 #include "kudu/common/wire_protocol.h"
 #include "kudu/consensus/metadata.pb.h"
 #include "kudu/gutil/basictypes.h"
@@ -44,11 +46,13 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/master/master.pb.h"
 #include "kudu/master/master.proxy.h"
-#include "kudu/rpc/response_callback.h"
 #include "kudu/rpc/rpc.h"
 #include "kudu/rpc/rpc_controller.h"
+#include "kudu/security/token.pb.h"
+#include "kudu/tserver/tserver.pb.h"
 #include "kudu/tserver/tserver_admin.proxy.h"
 #include "kudu/tserver/tserver_service.proxy.h"
+#include "kudu/util/async_util.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/net/dns_resolver.h"
@@ -70,6 +74,8 @@
 using kudu::pb_util::SecureShortDebugString;
 using kudu::rpc::BackoffType;
 using kudu::rpc::CredentialsPolicy;
+using kudu::rpc::RpcController;
+using kudu::security::SignedTokenPB;
 using kudu::tserver::TabletServerAdminServiceProxy;
 using kudu::tserver::TabletServerServiceProxy;
 using std::set;
@@ -1311,6 +1317,83 @@
   entry_by_tablet_id_.clear();
 }
 
+Status MetaCache::GetTableKeyRanges(const KuduTable* table,
+                                    const PartitionKey& partition_key,
+                                    LookupType lookup_type,
+                                    uint64_t split_size_bytes,
+                                    const MonoDelta& timeout,
+                                    vector<RangeWithRemoteTablet>* range_tablets) {
+  scoped_refptr<internal::RemoteTablet> tablet;
+  Synchronizer sync;
+  MonoTime deadline = MonoTime::Now() + timeout;
+  LookupTabletByKey(table,
+                    partition_key,
+                    deadline,
+                    lookup_type,
+                    &tablet,
+                    sync.AsStatusCallback());
+  RETURN_NOT_OK(sync.Wait());
+
+  if (split_size_bytes == 0) {
+    KeyRange key_range(
+        tablet->partition().begin().ToString(),
+        tablet->partition().end().ToString(),
+        split_size_bytes);
+    range_tablets->emplace_back(key_range, tablet);
+    return Status::OK();
+  }
+  DCHECK_GT(split_size_bytes, 0);
+
+  RemoteTabletServer *ts;
+  vector<RemoteTabletServer*> candidates;
+  set<string> blacklist;
+  RETURN_NOT_OK(table->client()->data_->GetTabletServer(table->client(),
+                                                        tablet,
+                                                        KuduClient::LEADER_ONLY,
+                                                        blacklist,
+                                                        &candidates,
+                                                        &ts));
+  CHECK(ts);
+  CHECK(ts->proxy());
+  auto proxy = ts->proxy();
+
+  tserver::SplitKeyRangeRequestPB req;
+  tserver::SplitKeyRangeResponsePB resp;
+  req.set_tablet_id(tablet->tablet_id());
+  if (!tablet->partition().begin().ToString().empty()) {
+    req.set_start_primary_key(tablet->partition().begin().ToString());
+  }
+  if (!tablet->partition().end().ToString().empty()) {
+    req.set_stop_primary_key(tablet->partition().end().ToString());
+  }
+  req.set_target_chunk_size_bytes(split_size_bytes);
+  SignedTokenPB authz_token;
+  if (table->client()->data_->FetchCachedAuthzToken(table->id(), &authz_token)) {
+    *req.mutable_authz_token() = std::move(authz_token);
+  } else {
+    // Note: this is expected if attempting to connect to a cluster that does
+    // not support fine-grained access control.
+    VLOG(1) << "no authz token for table " << table->id();
+  }
+
+  RpcController rpc;
+  rpc.set_timeout(timeout);
+  RETURN_NOT_OK(proxy->SplitKeyRange(req, &resp, &rpc));
+  if (resp.has_error()) {
+    return StatusFromPB(resp.error().status());
+  }
+
+  for (const auto& range : resp.ranges()) {
+    KeyRange key_range(
+        range.has_start_primary_key() ? range.start_primary_key() : "",
+        range.has_stop_primary_key() ? range.stop_primary_key() : "",
+        range.size_bytes_estimates());
+    range_tablets->emplace_back(key_range, tablet);
+  }
+
+  return Status::OK();
+}
+
 void MetaCache::LookupTabletByKey(const KuduTable* table,
                                   PartitionKey partition_key,
                                   const MonoTime& deadline,
diff --git a/src/kudu/client/meta_cache.h b/src/kudu/client/meta_cache.h
index c93d2ba..0904266 100644
--- a/src/kudu/client/meta_cache.h
+++ b/src/kudu/client/meta_cache.h
@@ -18,6 +18,8 @@
 // This module is internal to the client and not a public API.
 #pragma once
 
+#include <cstdint>
+
 #include <atomic>
 #include <map>
 #include <memory>
@@ -32,6 +34,7 @@
 #include <gtest/gtest_prod.h>
 
 #include "kudu/client/replica_controller-internal.h"
+#include "kudu/common/key_range.h"
 #include "kudu/common/partition.h"
 #include "kudu/consensus/metadata.pb.h"
 #include "kudu/gutil/macros.h"
@@ -406,6 +409,18 @@
     kLowerBound
   };
 
+  struct RangeWithRemoteTablet {
+    RangeWithRemoteTablet(KeyRange krange,
+                          const scoped_refptr<internal::RemoteTablet>& rtablet)
+        : key_range(std::move(krange)),
+          remote_tablet(rtablet) {}
+
+    KeyRange key_range;
+    scoped_refptr<internal::RemoteTablet> remote_tablet;
+  };
+
+  typedef std::map<PartitionKey, MetaCacheEntry> TabletMap;
+
   // Look up which tablet hosts the given partition key for a table. When it is
   // available, the tablet is stored in 'remote_tablet' (if not NULL) and the
   // callback is fired. Only tablets with non-failed LEADERs are considered.
@@ -422,6 +437,17 @@
                          scoped_refptr<RemoteTablet>* remote_tablet,
                          const StatusCallback& callback);
 
+  // Look up which tablet hosts the given partition key for a table.
+  // If @split_size_bytes set nonzero, send SplitKeyRangeRPC to remote tserver,
+  // otherwise search only occurs locally.
+  // The result stored in 'range_tablets'.
+  Status GetTableKeyRanges(const KuduTable* table,
+                           const PartitionKey& partition_key,
+                           LookupType lookup_type,
+                           uint64_t split_size_bytes,
+                           const MonoDelta& timeout,
+                           std::vector<RangeWithRemoteTablet>* range_tablets);
+
   // Look up the locations of the given tablet, storing the result in
   // 'remote_tablet' if not null, and calling 'lookup_complete_cb' once the
   // lookup is complete. Only tablets with non-failed LEADERs are considered.
@@ -551,7 +577,6 @@
   // for key-based lookups.
   //
   // Protected by lock_.
-  typedef std::map<PartitionKey, MetaCacheEntry> TabletMap;
   std::unordered_map<std::string, TabletMap> tablets_by_table_and_key_;
 
   // Cache entries for tablets, keyed by tablet ID, used for ID-based lookups.
diff --git a/src/kudu/client/scan_token-internal.cc b/src/kudu/client/scan_token-internal.cc
index 434ce58..91360e6 100644
--- a/src/kudu/client/scan_token-internal.cc
+++ b/src/kudu/client/scan_token-internal.cc
@@ -17,12 +17,12 @@
 
 #include "kudu/client/scan_token-internal.h"
 
-#include <cstdint>
 #include <map>
 #include <memory>
 #include <optional>
 #include <ostream>
 #include <string>
+#include <type_traits>
 #include <unordered_map>
 #include <utility>
 #include <vector>
@@ -40,8 +40,10 @@
 #include "kudu/client/shared_ptr.h" // IWYU pragma: keep
 #include "kudu/client/tablet-internal.h"
 #include "kudu/client/tablet_server-internal.h"
+#include "kudu/common/column_predicate.h"
 #include "kudu/common/common.pb.h"
 #include "kudu/common/encoded_key.h"
+#include "kudu/common/key_range.h"
 #include "kudu/common/partition.h"
 #include "kudu/common/partition_pruner.h"
 #include "kudu/common/scan_spec.h"
@@ -54,7 +56,6 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/master/master.pb.h"
 #include "kudu/security/token.pb.h"
-#include "kudu/util/async_util.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/slice.h"
@@ -68,7 +69,6 @@
 
 namespace kudu {
 
-class ColumnPredicate;
 using master::GetTableLocationsResponsePB;
 using master::TableIdentifierPB;
 using master::TabletLocationsPB;
@@ -432,36 +432,51 @@
     pb.set_batch_size_bytes(configuration_.batch_size_bytes());
   }
 
-  MonoTime deadline = MonoTime::Now() + client->default_admin_operation_timeout();
-
   PartitionPruner pruner;
+  vector<MetaCache::RangeWithRemoteTablet> range_tablets;
   pruner.Init(*table->schema().schema_, table->partition_schema(), configuration_.spec());
   while (pruner.HasMorePartitionKeyRanges()) {
-    scoped_refptr<internal::RemoteTablet> tablet;
-    Synchronizer sync;
+    PartitionKey key_range;
+    vector<MetaCache::RangeWithRemoteTablet> tmp_range_tablets;
     const auto& partition_key = pruner.NextPartitionKey();
-    client->data_->meta_cache_->LookupTabletByKey(table,
-                                                  partition_key,
-                                                  deadline,
-                                                  MetaCache::LookupType::kLowerBound,
-                                                  &tablet,
-                                                  sync.AsStatusCallback());
-    const auto s = sync.Wait();
+    Status s = client->data_->meta_cache_->GetTableKeyRanges(
+        table,
+        partition_key,
+        MetaCache::LookupType::kLowerBound,
+        split_size_bytes_,
+        client->default_rpc_timeout(),
+        &tmp_range_tablets);
+
     if (s.IsNotFound()) {
-      // No more tablets in the table.
       pruner.RemovePartitionKeyRange({});
       continue;
     }
     RETURN_NOT_OK(s);
 
-    // Check if the meta cache returned a tablet covering a partition key range past
-    // what we asked for. This can happen if the requested partition key falls
-    // in a non-covered range. In this case we can potentially prune the tablet.
-    if (partition_key < tablet->partition().begin() &&
-        pruner.ShouldPrune(tablet->partition())) {
-      pruner.RemovePartitionKeyRange(tablet->partition().end());
-      continue;
+    if (tmp_range_tablets.empty()) {
+      pruner.RemovePartitionKeyRange(partition_key);
+    } else {
+      // If split_size_bytes_ set to zero, we just do search in meta cache.
+      // Check if the meta cache returned a tablet covering a partition key range past
+      // what we asked for. This can happen if the requested partition key falls
+      // in a non-covered range. In this case we can potentially prune the tablet.
+      if (split_size_bytes_ == 0 &&
+          partition_key < tmp_range_tablets.back().remote_tablet->partition().begin() &&
+          pruner.ShouldPrune(tmp_range_tablets.back().remote_tablet->partition())) {
+        pruner.RemovePartitionKeyRange(tmp_range_tablets.back().remote_tablet->partition().end());
+        continue;
+      }
+      for (auto& range_tablet : tmp_range_tablets) {
+        range_tablets.push_back(range_tablet);
+      }
+      pruner.RemovePartitionKeyRange(tmp_range_tablets.back().remote_tablet->partition().end());
     }
+  }
+
+  for (const auto& range_tablet : range_tablets) {
+    const auto& range = range_tablet.key_range;
+    const auto& tablet = range_tablet.remote_tablet;
+
     vector<internal::RemoteReplica> replicas;
     tablet->GetRemoteReplicas(&replicas);
 
@@ -499,6 +514,15 @@
     message.CopyFrom(pb);
     message.set_lower_bound_partition_key(tablet->partition().begin().ToString());
     message.set_upper_bound_partition_key(tablet->partition().end().ToString());
+    if (!range.start_primary_key().empty() && split_size_bytes_) {
+      message.clear_lower_bound_primary_key();
+      message.set_lower_bound_primary_key(range.start_primary_key());
+    }
+    if (!range.stop_primary_key().empty() && split_size_bytes_) {
+      message.clear_upper_bound_primary_key();
+      message.set_upper_bound_primary_key(range.stop_primary_key());
+    }
+
 
     // Set the tablet metadata so that a call to the master is not needed to
     // locate the tablet to scan when opening the scanner.
@@ -557,8 +581,8 @@
                                 std::move(message),
                                 std::move(client_tablet));
     tokens->push_back(client_scan_token.release());
-    pruner.RemovePartitionKeyRange(tablet->partition().end());
   }
+
   return Status::OK();
 }
 
diff --git a/src/kudu/client/scan_token-internal.h b/src/kudu/client/scan_token-internal.h
index 6296da0..97d4640 100644
--- a/src/kudu/client/scan_token-internal.h
+++ b/src/kudu/client/scan_token-internal.h
@@ -17,6 +17,8 @@
 
 #pragma once
 
+#include <cstdint>
+
 #include <memory>
 #include <string>
 #include <vector>
@@ -80,10 +82,15 @@
     include_tablet_metadata_ = include_metadata;
   }
 
+  void SplitSizeBytes(uint64_t split_size_bytes) {
+    split_size_bytes_ = split_size_bytes;
+  }
+
 private:
   ScanConfiguration configuration_;
   bool include_table_metadata_ = true;
   bool include_tablet_metadata_ = true;
+  uint64_t split_size_bytes_ = 0;
 };
 
 } // namespace client