KUDU-3393 C++ client support split a tablet to mutil ranges and concurrent scan data
I add a param for build 'KuduScanToken', like this :
`
KuduScanTokenBuilder builder(table);
vector<KuduScanToken*> tokens;
ElementDeleter deleter(&tokens);
// set splitSizeBytes
builder.SetSplitSizeBytes(1000);
ASSERT_OK(builder.Build(&tokens));
`
The default value of split_size_bytes is 0, and this means we don't split the key range
for a tablet.
If the value of split_size_bytes is nonzero, we will try to send a SplitKeyRangeRPC to
tservers. We may get more tokens than tablets num and the more tokens will help us scan faster.
Change-Id: I207f9584cd558d32fcd9e8de7d6c25e517377272
Reviewed-on: http://gerrit.cloudera.org:8080/18945
Tested-by: Kudu Jenkins
Reviewed-by: Yingchun Lai <acelyc1112009@gmail.com>
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index b7d5412..492a6e9 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -141,6 +141,7 @@
DECLARE_bool(catalog_manager_support_live_row_count);
DECLARE_bool(catalog_manager_support_on_disk_size);
DECLARE_bool(client_use_unix_domain_sockets);
+DECLARE_bool(enable_rowset_compaction);
DECLARE_bool(enable_txn_system_client_init);
DECLARE_bool(fail_dns_resolution);
DECLARE_bool(location_mapping_by_uuid);
@@ -217,6 +218,7 @@
using kudu::tablet::TabletReplica;
using kudu::transactions::TxnTokenPB;
using kudu::tserver::MiniTabletServer;
+using std::atomic;
using std::function;
using std::map;
using std::nullopt;
@@ -400,6 +402,68 @@
}
}
+ void CheckTokensInfo(const vector<KuduScanToken*>& tokens,
+ int replica_num = 1) {
+ for (const auto* t : tokens) {
+ const KuduTablet& tablet = t->tablet();
+ ASSERT_EQ(replica_num, tablet.replicas().size());
+ const KuduReplica* replica = tablet.replicas()[0];
+ ASSERT_TRUE(replica->is_leader());
+ const MiniTabletServer* ts = cluster_->mini_tablet_server(0);
+ ASSERT_EQ(ts->server()->instance_pb().permanent_uuid(),
+ replica->ts().uuid());
+ ASSERT_EQ(ts->bound_rpc_addr().host(), replica->ts().hostname());
+ ASSERT_EQ(ts->bound_rpc_addr().port(), replica->ts().port());
+
+ unique_ptr<KuduTablet> tablet_copy;
+ {
+ KuduTablet* ptr;
+ ASSERT_OK(client_->GetTablet(tablet.id(), &ptr));
+ tablet_copy.reset(ptr);
+ }
+ ASSERT_EQ(tablet.id(), tablet_copy->id());
+ ASSERT_EQ(1, tablet_copy->replicas().size());
+ const KuduReplica* replica_copy = tablet_copy->replicas()[0];
+
+ ASSERT_EQ(replica->is_leader(), replica_copy->is_leader());
+ ASSERT_EQ(replica->ts().uuid(), replica_copy->ts().uuid());
+ ASSERT_EQ(replica->ts().hostname(), replica_copy->ts().hostname());
+ ASSERT_EQ(replica->ts().port(), replica_copy->ts().port());
+ }
+ }
+
+ int CountRows(const vector<KuduScanToken*>& tokens) {
+ atomic<uint32_t> rows(0);
+ vector<thread> threads;
+ for (KuduScanToken* token : tokens) {
+ string buf;
+ CHECK_OK(token->Serialize(&buf));
+
+ threads.emplace_back([this, &rows] (const string& serialized_token) {
+ shared_ptr<KuduClient> client;
+ ASSERT_OK(cluster_->CreateClient(nullptr, &client));
+ KuduScanner* scanner_ptr;
+ ASSERT_OK(KuduScanToken::DeserializeIntoScanner(
+ client.get(), serialized_token, &scanner_ptr));
+ unique_ptr<KuduScanner> scanner(scanner_ptr);
+ ASSERT_OK(scanner->Open());
+
+ while (scanner->HasMoreRows()) {
+ KuduScanBatch batch;
+ ASSERT_OK(scanner->NextBatch(&batch));
+ rows += batch.NumRows();
+ }
+ scanner->Close();
+ }, std::move(buf));
+ }
+
+ for (thread& thread : threads) {
+ thread.join();
+ }
+
+ return rows;
+ }
+
// Return the number of lookup-related RPCs which have been serviced by the master.
int CountMasterLookupRPCs() const {
auto ent = cluster_->mini_master()->master()->metric_entity();
@@ -8539,6 +8603,171 @@
}
}
+class TableKeyRangeTest : public ClientTest {
+ public:
+ Status BuildSchema() override {
+ KuduSchemaBuilder b;
+ b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey();
+ b.AddColumn("int_val")->Type(KuduColumnSchema::INT32)->NotNull();
+ b.AddColumn("string_val")->Type(KuduColumnSchema::STRING)->Nullable();
+ return b.Build(&schema_);
+ }
+
+ void SetUp() override {
+ ClientTest::SetUp();
+
+ // Set a low flush threshold so we can scan a mix of flushed data in
+ // in-memory data.
+ FLAGS_flush_threshold_mb = 0;
+ FLAGS_flush_threshold_secs = 1;
+
+ // Disable rowset compact to prevent DRSs being merged because they are too small.
+ FLAGS_enable_rowset_compaction = false;
+
+ ASSERT_OK(CreateTable(kTableName, 1, {}, GeneratePartialRows(), &client_table_));
+ }
+
+ typedef vector<pair<unique_ptr<KuduPartialRow>, unique_ptr<KuduPartialRow>>> KuduPartialRowsVec;
+ KuduPartialRowsVec GeneratePartialRows() const {
+ KuduPartialRowsVec rows;
+ vector<int> keys = { 0, 250, 500, 750 };
+ for (int i = 0; i < keys.size(); i++) {
+ unique_ptr<KuduPartialRow> lower_bound(schema_.NewRow());
+ CHECK_OK(lower_bound->SetInt32("key", keys[i]));
+ unique_ptr<KuduPartialRow> upper_bound(schema_.NewRow());
+ CHECK_OK(upper_bound->SetInt32("key", keys[i] + 250));
+ rows.emplace_back(lower_bound.release(), upper_bound.release());
+ }
+
+ return rows;
+ }
+
+ static void InsertTestRowsWithStrings(KuduTable* table, KuduSession* session, int num_rows) {
+ vector<int> keys = { 0, 250, 500, 750 };
+ string str_val = "*";
+ int diff_value = 120; // use to create discontinuous data in a tablet
+ for (int k = 0; k < keys.size(); k++) {
+ for (int i = keys[k]; i < keys[k] + num_rows; i++) {
+ unique_ptr<KuduInsert> insert(table->NewInsert());
+ ASSERT_OK(insert->mutable_row()->SetInt32("key", i));
+ ASSERT_OK(insert->mutable_row()->SetInt32("int_val", i * 2));
+ ASSERT_OK(insert->mutable_row()->SetString("string_val", str_val));
+ ASSERT_OK(session->Apply(insert.release()));
+ ASSERT_OK(session->Flush());
+ }
+ for (int i = keys[k] + diff_value; i < keys[k] + diff_value + num_rows; i++) {
+ unique_ptr<KuduInsert> insert(table->NewInsert());
+ ASSERT_OK(insert->mutable_row()->SetInt32("key", i));
+ ASSERT_OK(insert->mutable_row()->SetInt32("int_val", i * 2));
+ ASSERT_OK(insert->mutable_row()->SetString("string_val", str_val));
+ ASSERT_OK(session->Apply(insert.release()));
+ ASSERT_OK(session->Flush());
+ }
+ }
+ }
+
+ protected:
+ static constexpr const char* const kTableName = "client-testrange";
+
+ shared_ptr<KuduTable> range_table_;
+};
+
+TEST_F(TableKeyRangeTest, TestGetTableKeyRange) {
+ client::sp::shared_ptr<KuduTable> table;
+ ASSERT_OK(client_->OpenTable(kTableName, &table));
+ {
+ // Create session
+ shared_ptr<KuduSession> session = client_->NewSession();
+ session->SetTimeoutMillis(10000);
+ ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
+
+ // Should have no rows to begin with.
+ ASSERT_EQ(0, CountRowsFromClient(table.get()));
+ // Insert rows
+ NO_FATALS(InsertTestRowsWithStrings(client_table_.get(), session.get(), 100));
+ NO_FATALS(CheckNoRpcOverflow());
+ }
+
+ {
+ // search meta cache by default
+ //
+ // There are tablet information in the meta cache.
+ // We give priority to the data in the cache by default.
+ KuduScanTokenBuilder builder(table.get());
+ vector<KuduScanToken*> tokens;
+ ElementDeleter deleter(&tokens);
+ ASSERT_OK(builder.Build(&tokens));
+ ASSERT_EQ(4, tokens.size());
+
+ NO_FATALS(CheckTokensInfo(tokens));
+ ASSERT_EQ(800, CountRows(tokens));
+ }
+
+ {
+ // search meta cache by local
+ //
+ // If the splitSizeBytes set to 0 , we search the meta cache.
+ KuduScanTokenBuilder builder(table.get());
+ vector<KuduScanToken*> tokens;
+ ElementDeleter deleter(&tokens);
+ // set splitSizeBytes to 0
+ builder.SetSplitSizeBytes(0);
+ ASSERT_OK(builder.Build(&tokens));
+ ASSERT_EQ(4, tokens.size());
+
+ NO_FATALS(CheckTokensInfo(tokens));
+ ASSERT_EQ(800, CountRows(tokens));
+ }
+
+ uint32_t token_size_a = 0;
+ {
+ // search from tserver
+ KuduScanTokenBuilder builder(table.get());
+ vector<KuduScanToken*> tokens;
+ ElementDeleter deleter(&tokens);
+ // set splitSizeBytes < tablet's size
+ builder.SetSplitSizeBytes(700);
+ ASSERT_OK(builder.Build(&tokens));
+ token_size_a = tokens.size();
+ ASSERT_LT(4, token_size_a);
+
+ NO_FATALS(CheckTokensInfo(tokens));
+ ASSERT_EQ(800, CountRows(tokens));
+ }
+
+ uint32_t token_size_b = 0;
+ {
+ // search from tserver
+ KuduScanTokenBuilder builder(table.get());
+ vector<KuduScanToken*> tokens;
+ ElementDeleter deleter(&tokens);
+ // set splitSizeBytes < tablet's size
+ builder.SetSplitSizeBytes(20);
+ ASSERT_OK(builder.Build(&tokens));
+ token_size_b = tokens.size();
+ ASSERT_LT(4, token_size_b);
+
+ NO_FATALS(CheckTokensInfo(tokens));
+ ASSERT_EQ(800, CountRows(tokens));
+ }
+
+ // diffferent splitSizeBytes leads to different token
+ ASSERT_NE(token_size_a, token_size_b);
+
+ {
+ // search from tserver
+ KuduScanTokenBuilder builder(table.get());
+ vector<KuduScanToken*> tokens;
+ ElementDeleter deleter(&tokens);
+ // set splitSizeBytes > tablet's size
+ builder.SetSplitSizeBytes(1024 * 1024 * 1024);
+ ASSERT_OK(builder.Build(&tokens));
+ ASSERT_EQ(tokens.size(), 4);
+
+ NO_FATALS(CheckTokensInfo(tokens));
+ ASSERT_EQ(800, CountRows(tokens));
+ }
+}
class ClientTxnManagerProxyTest : public ClientTest {
public:
void SetUp() override {
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index f47f0ef..818bb27 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -2306,6 +2306,10 @@
return data_->Build(tokens);
}
+void KuduScanTokenBuilder::SetSplitSizeBytes(uint64_t split_size_bytes) {
+ return data_->SplitSizeBytes(split_size_bytes);
+}
+
////////////////////////////////////////////////////////////
// KuduReplica
////////////////////////////////////////////////////////////
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 834d77d..82f3934 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -3358,6 +3358,11 @@
/// @return Operation result status.
Status Build(std::vector<KuduScanToken*>* tokens) WARN_UNUSED_RESULT;
+ /// Set the size of the data in each key range.
+ /// The default value is 0 without set and tokens build by meta cache.
+ /// It's corresponding to 'setSplitSizeBytes' in Java client.
+ void SetSplitSizeBytes(uint64_t split_size_bytes);
+
private:
class KUDU_NO_EXPORT Data;
diff --git a/src/kudu/client/meta_cache.cc b/src/kudu/client/meta_cache.cc
index 948de02..f47b1fd 100644
--- a/src/kudu/client/meta_cache.cc
+++ b/src/kudu/client/meta_cache.cc
@@ -24,6 +24,7 @@
#include <ostream>
#include <set>
#include <string>
+#include <type_traits>
#include <utility>
#include <vector>
@@ -36,6 +37,7 @@
#include "kudu/client/master_proxy_rpc.h"
#include "kudu/client/schema.h"
#include "kudu/common/common.pb.h"
+#include "kudu/common/key_range.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/consensus/metadata.pb.h"
#include "kudu/gutil/basictypes.h"
@@ -44,11 +46,13 @@
#include "kudu/gutil/strings/substitute.h"
#include "kudu/master/master.pb.h"
#include "kudu/master/master.proxy.h"
-#include "kudu/rpc/response_callback.h"
#include "kudu/rpc/rpc.h"
#include "kudu/rpc/rpc_controller.h"
+#include "kudu/security/token.pb.h"
+#include "kudu/tserver/tserver.pb.h"
#include "kudu/tserver/tserver_admin.proxy.h"
#include "kudu/tserver/tserver_service.proxy.h"
+#include "kudu/util/async_util.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/logging.h"
#include "kudu/util/net/dns_resolver.h"
@@ -70,6 +74,8 @@
using kudu::pb_util::SecureShortDebugString;
using kudu::rpc::BackoffType;
using kudu::rpc::CredentialsPolicy;
+using kudu::rpc::RpcController;
+using kudu::security::SignedTokenPB;
using kudu::tserver::TabletServerAdminServiceProxy;
using kudu::tserver::TabletServerServiceProxy;
using std::set;
@@ -1311,6 +1317,83 @@
entry_by_tablet_id_.clear();
}
+Status MetaCache::GetTableKeyRanges(const KuduTable* table,
+ const PartitionKey& partition_key,
+ LookupType lookup_type,
+ uint64_t split_size_bytes,
+ const MonoDelta& timeout,
+ vector<RangeWithRemoteTablet>* range_tablets) {
+ scoped_refptr<internal::RemoteTablet> tablet;
+ Synchronizer sync;
+ MonoTime deadline = MonoTime::Now() + timeout;
+ LookupTabletByKey(table,
+ partition_key,
+ deadline,
+ lookup_type,
+ &tablet,
+ sync.AsStatusCallback());
+ RETURN_NOT_OK(sync.Wait());
+
+ if (split_size_bytes == 0) {
+ KeyRange key_range(
+ tablet->partition().begin().ToString(),
+ tablet->partition().end().ToString(),
+ split_size_bytes);
+ range_tablets->emplace_back(key_range, tablet);
+ return Status::OK();
+ }
+ DCHECK_GT(split_size_bytes, 0);
+
+ RemoteTabletServer *ts;
+ vector<RemoteTabletServer*> candidates;
+ set<string> blacklist;
+ RETURN_NOT_OK(table->client()->data_->GetTabletServer(table->client(),
+ tablet,
+ KuduClient::LEADER_ONLY,
+ blacklist,
+ &candidates,
+ &ts));
+ CHECK(ts);
+ CHECK(ts->proxy());
+ auto proxy = ts->proxy();
+
+ tserver::SplitKeyRangeRequestPB req;
+ tserver::SplitKeyRangeResponsePB resp;
+ req.set_tablet_id(tablet->tablet_id());
+ if (!tablet->partition().begin().ToString().empty()) {
+ req.set_start_primary_key(tablet->partition().begin().ToString());
+ }
+ if (!tablet->partition().end().ToString().empty()) {
+ req.set_stop_primary_key(tablet->partition().end().ToString());
+ }
+ req.set_target_chunk_size_bytes(split_size_bytes);
+ SignedTokenPB authz_token;
+ if (table->client()->data_->FetchCachedAuthzToken(table->id(), &authz_token)) {
+ *req.mutable_authz_token() = std::move(authz_token);
+ } else {
+ // Note: this is expected if attempting to connect to a cluster that does
+ // not support fine-grained access control.
+ VLOG(1) << "no authz token for table " << table->id();
+ }
+
+ RpcController rpc;
+ rpc.set_timeout(timeout);
+ RETURN_NOT_OK(proxy->SplitKeyRange(req, &resp, &rpc));
+ if (resp.has_error()) {
+ return StatusFromPB(resp.error().status());
+ }
+
+ for (const auto& range : resp.ranges()) {
+ KeyRange key_range(
+ range.has_start_primary_key() ? range.start_primary_key() : "",
+ range.has_stop_primary_key() ? range.stop_primary_key() : "",
+ range.size_bytes_estimates());
+ range_tablets->emplace_back(key_range, tablet);
+ }
+
+ return Status::OK();
+}
+
void MetaCache::LookupTabletByKey(const KuduTable* table,
PartitionKey partition_key,
const MonoTime& deadline,
diff --git a/src/kudu/client/meta_cache.h b/src/kudu/client/meta_cache.h
index c93d2ba..0904266 100644
--- a/src/kudu/client/meta_cache.h
+++ b/src/kudu/client/meta_cache.h
@@ -18,6 +18,8 @@
// This module is internal to the client and not a public API.
#pragma once
+#include <cstdint>
+
#include <atomic>
#include <map>
#include <memory>
@@ -32,6 +34,7 @@
#include <gtest/gtest_prod.h>
#include "kudu/client/replica_controller-internal.h"
+#include "kudu/common/key_range.h"
#include "kudu/common/partition.h"
#include "kudu/consensus/metadata.pb.h"
#include "kudu/gutil/macros.h"
@@ -406,6 +409,18 @@
kLowerBound
};
+ struct RangeWithRemoteTablet {
+ RangeWithRemoteTablet(KeyRange krange,
+ const scoped_refptr<internal::RemoteTablet>& rtablet)
+ : key_range(std::move(krange)),
+ remote_tablet(rtablet) {}
+
+ KeyRange key_range;
+ scoped_refptr<internal::RemoteTablet> remote_tablet;
+ };
+
+ typedef std::map<PartitionKey, MetaCacheEntry> TabletMap;
+
// Look up which tablet hosts the given partition key for a table. When it is
// available, the tablet is stored in 'remote_tablet' (if not NULL) and the
// callback is fired. Only tablets with non-failed LEADERs are considered.
@@ -422,6 +437,17 @@
scoped_refptr<RemoteTablet>* remote_tablet,
const StatusCallback& callback);
+ // Look up which tablet hosts the given partition key for a table.
+ // If @split_size_bytes set nonzero, send SplitKeyRangeRPC to remote tserver,
+ // otherwise search only occurs locally.
+ // The result stored in 'range_tablets'.
+ Status GetTableKeyRanges(const KuduTable* table,
+ const PartitionKey& partition_key,
+ LookupType lookup_type,
+ uint64_t split_size_bytes,
+ const MonoDelta& timeout,
+ std::vector<RangeWithRemoteTablet>* range_tablets);
+
// Look up the locations of the given tablet, storing the result in
// 'remote_tablet' if not null, and calling 'lookup_complete_cb' once the
// lookup is complete. Only tablets with non-failed LEADERs are considered.
@@ -551,7 +577,6 @@
// for key-based lookups.
//
// Protected by lock_.
- typedef std::map<PartitionKey, MetaCacheEntry> TabletMap;
std::unordered_map<std::string, TabletMap> tablets_by_table_and_key_;
// Cache entries for tablets, keyed by tablet ID, used for ID-based lookups.
diff --git a/src/kudu/client/scan_token-internal.cc b/src/kudu/client/scan_token-internal.cc
index 434ce58..91360e6 100644
--- a/src/kudu/client/scan_token-internal.cc
+++ b/src/kudu/client/scan_token-internal.cc
@@ -17,12 +17,12 @@
#include "kudu/client/scan_token-internal.h"
-#include <cstdint>
#include <map>
#include <memory>
#include <optional>
#include <ostream>
#include <string>
+#include <type_traits>
#include <unordered_map>
#include <utility>
#include <vector>
@@ -40,8 +40,10 @@
#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
#include "kudu/client/tablet-internal.h"
#include "kudu/client/tablet_server-internal.h"
+#include "kudu/common/column_predicate.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/encoded_key.h"
+#include "kudu/common/key_range.h"
#include "kudu/common/partition.h"
#include "kudu/common/partition_pruner.h"
#include "kudu/common/scan_spec.h"
@@ -54,7 +56,6 @@
#include "kudu/gutil/strings/substitute.h"
#include "kudu/master/master.pb.h"
#include "kudu/security/token.pb.h"
-#include "kudu/util/async_util.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/slice.h"
@@ -68,7 +69,6 @@
namespace kudu {
-class ColumnPredicate;
using master::GetTableLocationsResponsePB;
using master::TableIdentifierPB;
using master::TabletLocationsPB;
@@ -432,36 +432,51 @@
pb.set_batch_size_bytes(configuration_.batch_size_bytes());
}
- MonoTime deadline = MonoTime::Now() + client->default_admin_operation_timeout();
-
PartitionPruner pruner;
+ vector<MetaCache::RangeWithRemoteTablet> range_tablets;
pruner.Init(*table->schema().schema_, table->partition_schema(), configuration_.spec());
while (pruner.HasMorePartitionKeyRanges()) {
- scoped_refptr<internal::RemoteTablet> tablet;
- Synchronizer sync;
+ PartitionKey key_range;
+ vector<MetaCache::RangeWithRemoteTablet> tmp_range_tablets;
const auto& partition_key = pruner.NextPartitionKey();
- client->data_->meta_cache_->LookupTabletByKey(table,
- partition_key,
- deadline,
- MetaCache::LookupType::kLowerBound,
- &tablet,
- sync.AsStatusCallback());
- const auto s = sync.Wait();
+ Status s = client->data_->meta_cache_->GetTableKeyRanges(
+ table,
+ partition_key,
+ MetaCache::LookupType::kLowerBound,
+ split_size_bytes_,
+ client->default_rpc_timeout(),
+ &tmp_range_tablets);
+
if (s.IsNotFound()) {
- // No more tablets in the table.
pruner.RemovePartitionKeyRange({});
continue;
}
RETURN_NOT_OK(s);
- // Check if the meta cache returned a tablet covering a partition key range past
- // what we asked for. This can happen if the requested partition key falls
- // in a non-covered range. In this case we can potentially prune the tablet.
- if (partition_key < tablet->partition().begin() &&
- pruner.ShouldPrune(tablet->partition())) {
- pruner.RemovePartitionKeyRange(tablet->partition().end());
- continue;
+ if (tmp_range_tablets.empty()) {
+ pruner.RemovePartitionKeyRange(partition_key);
+ } else {
+ // If split_size_bytes_ set to zero, we just do search in meta cache.
+ // Check if the meta cache returned a tablet covering a partition key range past
+ // what we asked for. This can happen if the requested partition key falls
+ // in a non-covered range. In this case we can potentially prune the tablet.
+ if (split_size_bytes_ == 0 &&
+ partition_key < tmp_range_tablets.back().remote_tablet->partition().begin() &&
+ pruner.ShouldPrune(tmp_range_tablets.back().remote_tablet->partition())) {
+ pruner.RemovePartitionKeyRange(tmp_range_tablets.back().remote_tablet->partition().end());
+ continue;
+ }
+ for (auto& range_tablet : tmp_range_tablets) {
+ range_tablets.push_back(range_tablet);
+ }
+ pruner.RemovePartitionKeyRange(tmp_range_tablets.back().remote_tablet->partition().end());
}
+ }
+
+ for (const auto& range_tablet : range_tablets) {
+ const auto& range = range_tablet.key_range;
+ const auto& tablet = range_tablet.remote_tablet;
+
vector<internal::RemoteReplica> replicas;
tablet->GetRemoteReplicas(&replicas);
@@ -499,6 +514,15 @@
message.CopyFrom(pb);
message.set_lower_bound_partition_key(tablet->partition().begin().ToString());
message.set_upper_bound_partition_key(tablet->partition().end().ToString());
+ if (!range.start_primary_key().empty() && split_size_bytes_) {
+ message.clear_lower_bound_primary_key();
+ message.set_lower_bound_primary_key(range.start_primary_key());
+ }
+ if (!range.stop_primary_key().empty() && split_size_bytes_) {
+ message.clear_upper_bound_primary_key();
+ message.set_upper_bound_primary_key(range.stop_primary_key());
+ }
+
// Set the tablet metadata so that a call to the master is not needed to
// locate the tablet to scan when opening the scanner.
@@ -557,8 +581,8 @@
std::move(message),
std::move(client_tablet));
tokens->push_back(client_scan_token.release());
- pruner.RemovePartitionKeyRange(tablet->partition().end());
}
+
return Status::OK();
}
diff --git a/src/kudu/client/scan_token-internal.h b/src/kudu/client/scan_token-internal.h
index 6296da0..97d4640 100644
--- a/src/kudu/client/scan_token-internal.h
+++ b/src/kudu/client/scan_token-internal.h
@@ -17,6 +17,8 @@
#pragma once
+#include <cstdint>
+
#include <memory>
#include <string>
#include <vector>
@@ -80,10 +82,15 @@
include_tablet_metadata_ = include_metadata;
}
+ void SplitSizeBytes(uint64_t split_size_bytes) {
+ split_size_bytes_ = split_size_bytes;
+ }
+
private:
ScanConfiguration configuration_;
bool include_table_metadata_ = true;
bool include_tablet_metadata_ = true;
+ uint64_t split_size_bytes_ = 0;
};
} // namespace client