blob: 66d504a83da62c075501b062019e6d1ed5144d72 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <cstdint>
#include <cstdlib>
#include <iostream>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "kudu/client/client-test-util.h"
#include "kudu/client/client.h"
#include "kudu/client/scan_batch.h"
#include "kudu/client/scan_predicate.h"
#include "kudu/client/schema.h"
#include "kudu/client/value.h"
#include "kudu/client/write_op.h"
#include "kudu/common/partial_row.h"
#include "kudu/common/partition.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/master/catalog_manager.h"
#include "kudu/master/master.h"
#include "kudu/master/mini_master.h"
#include "kudu/mini-cluster/internal_mini_cluster.h"
#include "kudu/tablet/tablet.h"
#include "kudu/tablet/tablet_replica.h"
#include "kudu/tserver/mini_tablet_server.h"
#include "kudu/tserver/tablet_server.h"
#include "kudu/tserver/ts_tablet_manager.h"
#include "kudu/util/curl_util.h"
#include "kudu/util/faststring.h"
#include "kudu/util/metrics.h"
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
DECLARE_bool(enable_per_range_hash_schemas);
DECLARE_int32(flush_threshold_secs);
DECLARE_int32(heartbeat_interval_ms);
DECLARE_string(webserver_doc_root);
METRIC_DECLARE_counter(scans_started);
using kudu::client::sp::shared_ptr;
using kudu::client::KuduValue;
using kudu::cluster::InternalMiniCluster;
using kudu::cluster::InternalMiniClusterOptions;
using kudu::master::CatalogManager;
using kudu::master::TableInfo;
using kudu::master::TabletInfo;
using kudu::tablet::TabletReplica;
using std::string;
using std::unique_ptr;
using std::vector;
using strings::Substitute;
static constexpr const char* const kKeyColumn = "key";
static constexpr const char* const kIntValColumn = "int_val";
static constexpr const char* const kStringValColumn = "string_val";
namespace kudu {
namespace client {
class FlexPartitioningTest : public KuduTest {
public:
FlexPartitioningTest() {
KuduSchemaBuilder b;
b.AddColumn(kKeyColumn)->
Type(KuduColumnSchema::INT32)->
NotNull()->
PrimaryKey();
b.AddColumn(kIntValColumn)->
Type(KuduColumnSchema::INT32)->
NotNull();
b.AddColumn(kStringValColumn)->
Type(KuduColumnSchema::STRING)->
Nullable();
CHECK_OK(b.Build(&schema_));
}
virtual int num_tablet_servers() const {
return 1;
}
void SetUp() override {
KuduTest::SetUp();
// Reduce the TS<->Master heartbeat interval to speed up testing.
FLAGS_heartbeat_interval_ms = 10;
// Ensure the static pages are not available as tests are written based
// on this value of the flag
FLAGS_webserver_doc_root = "";
InternalMiniClusterOptions opt;
opt.num_tablet_servers = num_tablet_servers();
// Start minicluster and wait for tablet servers to connect to master.
cluster_.reset(new InternalMiniCluster(env_, std::move(opt)));
ASSERT_OK(cluster_->Start());
// Connect to the cluster.
ASSERT_OK(KuduClientBuilder()
.add_master_server_addr(
cluster_->mini_master()->bound_rpc_addr().ToString())
.Build(&client_));
}
protected:
typedef unique_ptr<KuduRangePartition> RangePartition;
typedef vector<RangePartition> RangePartitions;
static Status ApplyInsert(KuduSession* session,
const shared_ptr<KuduTable>& table,
int32_t key_val,
int32_t int_val,
string string_val) {
unique_ptr<KuduInsert> insert(table->NewInsert());
RETURN_NOT_OK(insert->mutable_row()->SetInt32(kKeyColumn, key_val));
RETURN_NOT_OK(insert->mutable_row()->SetInt32(kIntValColumn, int_val));
RETURN_NOT_OK(insert->mutable_row()->SetStringCopy(
kStringValColumn, std::move(string_val)));
return session->Apply(insert.release());
}
static Status FetchSessionErrors(KuduSession* session,
vector<KuduError*>* errors = nullptr) {
if (errors) {
bool overflowed;
session->GetPendingErrors(errors, &overflowed);
if (PREDICT_FALSE(overflowed)) {
return Status::RuntimeError("session error buffer overflowed");
}
}
return Status::OK();
}
Status InsertTestRows(
const char* table_name,
int32_t key_beg,
int32_t key_end,
KuduSession::FlushMode flush_mode = KuduSession::AUTO_FLUSH_SYNC,
vector<KuduError*>* errors = nullptr) {
CHECK_LE(key_beg, key_end);
shared_ptr<KuduTable> table;
RETURN_NOT_OK(client_->OpenTable(table_name, &table));
shared_ptr<KuduSession> session = client_->NewSession();
RETURN_NOT_OK(session->SetFlushMode(flush_mode));
session->SetTimeoutMillis(60000);
for (int32_t key_val = key_beg; key_val < key_end; ++key_val) {
if (const auto s = ApplyInsert(session.get(),
table,
key_val,
rand(),
std::to_string(rand()));
!s.ok()) {
RETURN_NOT_OK(FetchSessionErrors(session.get(), errors));
return s;
}
}
const auto s = session->Flush();
if (!s.ok()) {
RETURN_NOT_OK(FetchSessionErrors(session.get(), errors));
}
return s;
}
Status CreateTable(const char* table_name, RangePartitions partitions) {
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
table_creator->table_name(table_name)
.schema(&schema_)
.num_replicas(1)
.set_range_partition_columns({ kKeyColumn });
for (auto& p : partitions) {
table_creator->add_custom_range_partition(p.release());
}
return table_creator->Create();
}
static RangePartition CreateRangePartition(const KuduSchema& schema,
const string& key_column,
int32_t lower_bound,
int32_t upper_bound) {
unique_ptr<KuduPartialRow> lower(schema.NewRow());
CHECK_OK(lower->SetInt32(key_column, lower_bound));
unique_ptr<KuduPartialRow> upper(schema.NewRow());
CHECK_OK(upper->SetInt32(key_column, upper_bound));
return unique_ptr<KuduRangePartition>(
new KuduRangePartition(lower.release(), upper.release()));
}
RangePartition CreateRangePartition(int32_t lower_bound = 0,
int32_t upper_bound = 100) {
return CreateRangePartition(schema_, kKeyColumn, lower_bound, upper_bound);
}
RangePartition CreateRangePartitionNoLowerBound(int32_t upper_bound) {
unique_ptr<KuduPartialRow> upper(schema_.NewRow());
CHECK_OK(upper->SetInt32(kKeyColumn, upper_bound));
return unique_ptr<KuduRangePartition>(
new KuduRangePartition(schema_.NewRow(), upper.release()));
}
RangePartition CreateRangePartitionNoUpperBound(int32_t lower_bound) {
unique_ptr<KuduPartialRow> lower(schema_.NewRow());
CHECK_OK(lower->SetInt32(kKeyColumn, lower_bound));
return unique_ptr<KuduRangePartition>(
new KuduRangePartition(lower.release(), schema_.NewRow()));
}
void CheckTabletCount(const char* table_name, int expected_count) {
shared_ptr<KuduTable> table;
ASSERT_OK(client_->OpenTable(table_name, &table));
scoped_refptr<master::TableInfo> table_info;
{
auto* cm = cluster_->mini_master(0)->master()->catalog_manager();
CatalogManager::ScopedLeaderSharedLock l(cm);
ASSERT_OK(cm->GetTableInfo(table->id(), &table_info));
}
ASSERT_EQ(expected_count, table_info->num_tablets());
}
void CheckTableRowsNum(const char* table_name,
int64_t expected_count) {
shared_ptr<KuduTable> table;
ASSERT_OK(client_->OpenTable(table_name, &table));
KuduScanner scanner(table.get());
ASSERT_OK(scanner.SetSelection(KuduClient::LEADER_ONLY));
ASSERT_OK(scanner.SetReadMode(KuduScanner::READ_YOUR_WRITES));
ASSERT_OK(scanner.Open());
int64_t count = 0;
KuduScanBatch batch;
while (scanner.HasMoreRows()) {
ASSERT_OK(scanner.NextBatch(&batch));
count += batch.NumRows();
}
ASSERT_EQ(expected_count, count);
}
void CheckScanWithColumnPredicate(const string& table_name, Slice col_name,
int expected_count, int tablets, int lower, int upper) {
shared_ptr<KuduTable> table;
ASSERT_OK(client_->OpenTable(table_name, &table));
KuduScanner scanner(table.get());
ASSERT_OK(scanner.SetTimeoutMillis(60000));
if (lower != INT8_MIN) {
ASSERT_OK(scanner.AddConjunctPredicate(table->NewComparisonPredicate(
col_name, KuduPredicate::GREATER_EQUAL, KuduValue::FromInt(lower))));
}
if (upper != INT8_MAX) {
ASSERT_OK(scanner.AddConjunctPredicate(table->NewComparisonPredicate(
col_name, KuduPredicate::LESS, KuduValue::FromInt(upper))));
}
ASSERT_OK(scanner.Open());
int64_t count = 0;
while (scanner.HasMoreRows()) {
KuduScanBatch batch;
ASSERT_OK(scanner.NextBatch(&batch));
count += batch.NumRows();
}
ASSERT_EQ(count, expected_count);
NO_FATALS(CheckTabletsScanned(table_name, tablets));
}
void CheckTabletsScanned(const string& table_name, int tablets) {
shared_ptr<KuduTable> table;
ASSERT_OK(client_->OpenTable(table_name, &table));
vector<scoped_refptr<master::TabletInfo>> all_tablets_info;
{
auto* cm = cluster_->mini_master(0)->master()->catalog_manager();
CatalogManager::ScopedLeaderSharedLock l(cm);
scoped_refptr<master::TableInfo> table_info;
ASSERT_OK(cm->GetTableInfo(table->id(), &table_info));
table_info->GetAllTablets(&all_tablets_info);
}
int64_t tablets_scanned = 0;
for (const auto& tablet_info : all_tablets_info) {
for (auto i = 0; i < cluster_->num_tablet_servers(); ++i) {
scoped_refptr<tablet::TabletReplica> replica;
ASSERT_TRUE(cluster_->mini_tablet_server(i)->server()->
tablet_manager()->LookupTablet(tablet_info->id(), &replica));
ASSERT_TRUE(replica->tablet()->GetMetricEntity());
auto scans_started = METRIC_scans_started.Instantiate(replica->tablet()->GetMetricEntity());
tablets_scanned += scans_started->value();
scans_started->Reset();
}
}
ASSERT_EQ(tablets, tablets_scanned);
}
void CheckTableRowsNum(const char* table_name, const char* col_name,
int lower, int upper, int expected_row_count) {
shared_ptr<KuduTable> table;
ASSERT_OK(client_->OpenTable(table_name, &table));
KuduScanner scanner(table.get());
ASSERT_OK(scanner.SetTimeoutMillis(60000));
ASSERT_OK(scanner.AddConjunctPredicate(table->NewComparisonPredicate(
col_name, KuduPredicate::GREATER_EQUAL, KuduValue::FromInt(lower))));
ASSERT_OK(scanner.AddConjunctPredicate(table->NewComparisonPredicate(
col_name, KuduPredicate::LESS, KuduValue::FromInt(upper))));
ASSERT_OK(scanner.Open());
KuduScanBatch batch;
int live_row_count = 0;
while (scanner.HasMoreRows()) {
ASSERT_OK(scanner.NextBatch(&batch));
live_row_count += batch.NumRows();
}
ASSERT_EQ(expected_row_count, live_row_count);
}
KuduSchema schema_;
unique_ptr<InternalMiniCluster> cluster_;
shared_ptr<KuduClient> client_;
};
// Test for scenarios covering range partitioning with custom bucket schemas
// specified when creating a table.
class FlexPartitioningCreateTableTest : public FlexPartitioningTest {};
TEST_F(FlexPartitioningCreateTableTest, TableWideHashSchema) {
// Create a table with the following partitions:
//
// hash bucket
// key 0 1
// -------------------------
// <111 x:{key} x:{key}
constexpr const char* const kTableName = "TableWideHashSchema";
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
table_creator->table_name(kTableName)
.schema(&schema_)
.num_replicas(1)
.add_hash_partitions({ kKeyColumn }, 2)
.set_range_partition_columns({ kKeyColumn });
// Add a range partition with the table-wide hash partitioning rules.
{
unique_ptr<KuduPartialRow> lower(schema_.NewRow());
ASSERT_OK(lower->SetInt32(kKeyColumn, INT32_MIN));
unique_ptr<KuduPartialRow> upper(schema_.NewRow());
ASSERT_OK(upper->SetInt32(kKeyColumn, 111));
table_creator->add_range_partition(lower.release(), upper.release());
}
ASSERT_OK(table_creator->Create());
NO_FATALS(CheckTabletCount(kTableName, 2));
ASSERT_OK(InsertTestRows(kTableName, -111, 111, KuduSession::MANUAL_FLUSH));
NO_FATALS(CheckTableRowsNum(kTableName, 222));
}
TEST_F(FlexPartitioningCreateTableTest, EmptyTableWideHashSchema) {
constexpr const char* const kTableName = "EmptyTableWideHashSchema";
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
table_creator->table_name(kTableName)
.schema(&schema_)
.num_replicas(1)
.set_range_partition_columns({ kKeyColumn });
// Add a range partition with the table-wide hash partitioning rules: no hash
// bucketing at all.
{
unique_ptr<KuduPartialRow> lower(schema_.NewRow());
ASSERT_OK(lower->SetInt32(kKeyColumn, INT32_MIN));
unique_ptr<KuduPartialRow> upper(schema_.NewRow());
ASSERT_OK(upper->SetInt32(kKeyColumn, 111));
table_creator->add_range_partition(lower.release(), upper.release());
}
// Add a custom range: no hash bucketing as well.
{
auto p = CreateRangePartition(111, 222);
table_creator->add_custom_range_partition(p.release());
}
ASSERT_OK(table_creator->Create());
// There should be 2 tablets total: one per each range created.
NO_FATALS(CheckTabletCount(kTableName, 2));
ASSERT_OK(InsertTestRows(kTableName, -111, 222, KuduSession::MANUAL_FLUSH));
NO_FATALS(CheckTableRowsNum(kTableName, 333));
}
// Create tables with range partitions using custom hash bucket schemas only.
TEST_F(FlexPartitioningCreateTableTest, CustomHashSchemaDiffersFromTableWide) {
// Using one-level hash bucketing { 3, "key" } as the custom hash schema
// for the newly created range partition. Note that the table-wide hash schema
// is empty per FlexPartitioningTest::FlexPartitioningTest(), so the attempt
// to create such a table fails with the Status::NotSupported() status.
constexpr const char* const kTableName = "3@key";
RangePartitions partitions;
partitions.emplace_back(CreateRangePartition(0, 100));
auto& p = partitions.back();
ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 3, 0));
const auto s = CreateTable(kTableName, std::move(partitions));
ASSERT_TRUE(s.IsNotSupported()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(),
"varying number of hash dimensions per range is not yet supported");
}
// TODO(aserbin): re-enable this scenario once varying hash dimensions per range
// are supported
TEST_F(FlexPartitioningCreateTableTest, DISABLED_SingleCustomRangeEmptyHashSchema) {
// Create a table with the following partitions:
//
// hash bucket
// key 0 1
// -------------------------
// <111 x:{key} x:{key}
// 111-222 - -
constexpr const char* const kTableName = "SingleCustomRangeEmptyHashSchema";
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
table_creator->table_name(kTableName)
.schema(&schema_)
.num_replicas(1)
.add_hash_partitions({ kKeyColumn }, 2)
.set_range_partition_columns({ kKeyColumn });
// Add a range partition with the table-wide hash partitioning rules.
{
unique_ptr<KuduPartialRow> lower(schema_.NewRow());
ASSERT_OK(lower->SetInt32(kKeyColumn, INT32_MIN));
unique_ptr<KuduPartialRow> upper(schema_.NewRow());
ASSERT_OK(upper->SetInt32(kKeyColumn, 111));
table_creator->add_range_partition(lower.release(), upper.release());
}
// Add a range partition with no hash bucketing. Not calling
// KuduRangePartition::add_hash_partitions() on the newly created range means
// the range doesn't have any hash bucketing.
{
auto p = CreateRangePartition(111, 222);
table_creator->add_custom_range_partition(p.release());
}
ASSERT_OK(table_creator->Create());
NO_FATALS(CheckTabletCount(kTableName, 3));
// Make sure it's possible to insert rows into the table for all the existing
// the partitions: first check the range of table-wide schema, then check
// the ranges with custom hash schemas.
ASSERT_OK(InsertTestRows(kTableName, -111, 0));
NO_FATALS(CheckTableRowsNum(kTableName, 111));
ASSERT_OK(InsertTestRows(kTableName, 111, 222));
NO_FATALS(CheckTableRowsNum(kTableName, 222));
}
// Create a table with mixed set of range partitions, using both table-wide and
// custom hash bucket schemas.
TEST_F(FlexPartitioningCreateTableTest, DefaultAndCustomHashSchemas) {
// Create a table with the following partitions:
//
// hash bucket
// key 0 1 2 3
// -----------------------------------------------------------
// <111 x:{key} x:{key} - -
// 111-222 x:{key} x:{key} x:{key} -
// 222-333 x:{key} x:{key} x:{key} x:{key}
// 333-444 x:{key} x:{key} - -
constexpr const char* const kTableName = "DefaultAndCustomHashSchemas";
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
table_creator->table_name(kTableName)
.schema(&schema_)
.num_replicas(1)
.add_hash_partitions({ kKeyColumn }, 2)
.set_range_partition_columns({ kKeyColumn });
// Add a range partition with the table-wide hash partitioning rules.
{
unique_ptr<KuduPartialRow> lower(schema_.NewRow());
ASSERT_OK(lower->SetInt32(kKeyColumn, INT32_MIN));
unique_ptr<KuduPartialRow> upper(schema_.NewRow());
ASSERT_OK(upper->SetInt32(kKeyColumn, 111));
table_creator->add_range_partition(lower.release(), upper.release());
}
// Add a range partition with custom hash sub-partitioning rules:
// 3 buckets with hash based on the "key" column with hash seed 1.
{
auto p = CreateRangePartition(111, 222);
ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 3, 1));
table_creator->add_custom_range_partition(p.release());
}
// Add a range partition with custom hash sub-partitioning rules:
// 4 buckets with hash based on the "key" column with hash seed 2.
{
auto p = CreateRangePartition(222, 333);
ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 4, 2));
table_creator->add_custom_range_partition(p.release());
}
// Add a range partition with custom hash sub-partitioning rules:
// 2 buckets hashing on the "key" column with hash seed 3.
{
auto p = CreateRangePartition(333, 444);
ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 2, 3));
table_creator->add_custom_range_partition(p.release());
}
ASSERT_OK(table_creator->Create());
NO_FATALS(CheckTabletCount(kTableName, 11));
// Make sure it's possible to insert rows into the table for all the existing
// the partitions: first check the range of table-wide schema, then check
// the ranges with custom hash schemas.
ASSERT_OK(InsertTestRows(kTableName, -111, 0));
NO_FATALS(CheckTableRowsNum(kTableName, 111));
ASSERT_OK(InsertTestRows(kTableName, 111, 444));
NO_FATALS(CheckTableRowsNum(kTableName, 444));
// Meanwhile, inserting into non-covered ranges should result in a proper
// error status return to the client attempting such an operation.
{
vector<KuduError*> errors;
ElementDeleter drop(&errors);
auto s = InsertTestRows(
kTableName, 444, 445, KuduSession::AUTO_FLUSH_SYNC, &errors);
ASSERT_TRUE(s.IsIOError()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), "failed to flush data");
ASSERT_EQ(1, errors.size());
const auto& err = errors[0]->status();
EXPECT_TRUE(err.IsNotFound()) << err.ToString();
ASSERT_STR_CONTAINS(err.ToString(),
"No tablet covering the requested range partition");
}
// Try same as in the scope above, but do so for multiple rows to make sure
// custom hash bucketing isn't inducing any unexpected side effects.
{
constexpr int kNumRows = 10;
vector<KuduError*> errors;
ElementDeleter drop(&errors);
auto s = InsertTestRows(
kTableName, 445, 445 + kNumRows, KuduSession::MANUAL_FLUSH, &errors);
ASSERT_TRUE(s.IsIOError()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), "failed to flush data");
ASSERT_EQ(kNumRows, errors.size());
for (const auto& e : errors) {
const auto& err = e->status();
EXPECT_TRUE(err.IsNotFound()) << err.ToString();
ASSERT_STR_CONTAINS(err.ToString(),
"No tablet covering the requested range partition");
}
}
}
TEST_F(FlexPartitioningCreateTableTest, TabletServerWebUI) {
// Create a table with the following partitions:
//
// hash bucket
// key 0 1 2 3
// -----------------------------------------------------------
// <111 x:{key} x:{key} - -
// 111-222 x:{key} x:{key} x:{key} -
// 222-333 x:{key} x:{key} x:{key} x:{key}
constexpr const char* const kTableName = "TabletServerWebUI";
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
table_creator->table_name(kTableName)
.schema(&schema_)
.num_replicas(1)
.add_hash_partitions({ kKeyColumn }, 2)
.set_range_partition_columns({ kKeyColumn });
// Add a range partition with the table-wide hash partitioning rules.
{
unique_ptr<KuduPartialRow> lower(schema_.NewRow());
ASSERT_OK(lower->SetInt32(kKeyColumn, INT32_MIN));
unique_ptr<KuduPartialRow> upper(schema_.NewRow());
ASSERT_OK(upper->SetInt32(kKeyColumn, 111));
table_creator->add_range_partition(lower.release(), upper.release());
}
// Add a range partition with custom hash sub-partitioning rules:
// 3 buckets with hash based on the "key" column with hash seed 1.
{
auto p = CreateRangePartition(111, 222);
ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 3, 1));
table_creator->add_custom_range_partition(p.release());
}
// Add a range partition with custom hash sub-partitioning rules:
// 4 buckets with hash based on the "key" column with hash seed 2.
{
auto p = CreateRangePartition(222, 333);
ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 4, 2));
table_creator->add_custom_range_partition(p.release());
}
ASSERT_OK(table_creator->Create());
NO_FATALS(CheckTabletCount(kTableName, 9));
// Obtain the web page contents
EasyCurl c;
faststring buf;
ASSERT_OK(c.FetchURL(Substitute("http://$0/tablets",
cluster_->mini_tablet_server(0)->bound_http_addr().ToString()),
&buf));
string raw = buf.ToString();
// Get the list of tablets present in this table
std::vector<scoped_refptr<TableInfo>> tables;
{
CatalogManager::ScopedLeaderSharedLock l(
cluster_->mini_master(0)->master()->catalog_manager());
cluster_->mini_master(0)->master()->catalog_manager()->GetAllTables(&tables);
}
ASSERT_EQ(1, tables.size());
vector<scoped_refptr<TabletInfo>> tablets;
tables.front()->GetAllTablets(&tablets);
ASSERT_EQ(9, tablets.size());
// Validate the partition information rendered in the page
ASSERT_STR_CONTAINS(raw, Substitute("id=$0\"},\"partition\":\"HASH (key) PARTITION 0, "
"RANGE (key) PARTITION -2147483648 <= VALUES < 111",
tablets[0]->id()));
ASSERT_STR_CONTAINS(raw, Substitute("id=$0\"},\"partition\":\"HASH (key) PARTITION 0, "
"RANGE (key) PARTITION 111 <= VALUES < 222",
tablets[1]->id()));
ASSERT_STR_CONTAINS(raw, Substitute("id=$0\"},\"partition\":\"HASH (key) PARTITION 0, "
"RANGE (key) PARTITION 222 <= VALUES < 333",
tablets[2]->id()));
ASSERT_STR_CONTAINS(raw, Substitute("id=$0\"},\"partition\":\"HASH (key) PARTITION 1, "
"RANGE (key) PARTITION -2147483648 <= VALUES < 111",
tablets[3]->id()));
ASSERT_STR_CONTAINS(raw, Substitute("id=$0\"},\"partition\":\"HASH (key) PARTITION 1, "
"RANGE (key) PARTITION 111 <= VALUES < 222",
tablets[4]->id()));
ASSERT_STR_CONTAINS(raw, Substitute("id=$0\"},\"partition\":\"HASH (key) PARTITION 1, "
"RANGE (key) PARTITION 222 <= VALUES < 333",
tablets[5]->id()));
ASSERT_STR_CONTAINS(raw, Substitute("id=$0\"},\"partition\":\"HASH (key) PARTITION 2, "
"RANGE (key) PARTITION 111 <= VALUES < 222",
tablets[6]->id()));
ASSERT_STR_CONTAINS(raw, Substitute("id=$0\"},\"partition\":\"HASH (key) PARTITION 2, "
"RANGE (key) PARTITION 222 <= VALUES < 333",
tablets[7]->id()));
ASSERT_STR_CONTAINS(raw, Substitute("id=$0\"},\"partition\":\"HASH (key) PARTITION 3, "
"RANGE (key) PARTITION 222 <= VALUES < 333",
tablets[8]->id()));
}
// Parameters for a single hash dimension.
struct HashDimensionParameters {
vector<string> columns; // names of the columns to use for hash bucketing
int32_t num_buckets; // number of hash buckets
};
std::ostream& operator<<(std::ostream& os,
const HashDimensionParameters& params) {
os << "{columns:{";
for (const auto& c : params.columns) {
os << c << ",";
}
os << "},";
os << "num_buckets:" << params.num_buckets << ",";
return os;
}
// Hash bucketing parameters for a range partition.
typedef vector<HashDimensionParameters> HashPartitionParameters;
std::ostream& operator<<(std::ostream& os,
const HashPartitionParameters& params) {
os << "{";
for (const auto& e : params) {
os << e << ",";
}
os << "}";
return os;
}
// Range partition bounds.
struct RangeBounds {
int32_t lower; // inclusive lower bound
int32_t upper; // exclusive upper bound
};
// The whole set of parameters per range.
struct PerRangeParameters {
RangeBounds bounds;
HashPartitionParameters hash_params;
};
std::ostream& operator<<(std::ostream& os,
const PerRangeParameters& params) {
os << "{bounds:["
<< params.bounds.lower << ","
<< params.bounds.upper << "),{";
for (const auto& hp : params.hash_params) {
os << hp << ",";
}
os << "}}";
return os;
}
typedef vector<PerRangeParameters> TablePartitionParameters;
std::ostream& operator<<(std::ostream& os,
const TablePartitionParameters& params) {
os << "{";
for (const auto& per_range_params : params) {
os << per_range_params;
}
os << "}";
return os;
}
struct PerRangeTestParameters {
bool allowed;
TablePartitionParameters partition_params;
vector<HashDimensionParameters> table_wide_hash_schema_params;
};
std::ostream& operator<<(std::ostream& os,
const PerRangeTestParameters& params) {
os << "allowed:" << std::boolalpha << params.allowed
<< " partition_params:" << params.partition_params;
return os;
}
class FlexPartitioningCreateTableParamTest :
public FlexPartitioningCreateTableTest,
public testing::WithParamInterface<PerRangeTestParameters> {
public:
FlexPartitioningCreateTableParamTest() {
KuduSchemaBuilder b;
b.AddColumn(kKeyColumn)->Type(KuduColumnSchema::INT32)->NotNull();
b.AddColumn(kIntValColumn)->Type(KuduColumnSchema::INT32)->NotNull();
b.AddColumn(kStringValColumn)->Type(KuduColumnSchema::STRING);
b.SetPrimaryKey({ kKeyColumn, kIntValColumn });
CHECK_OK(b.Build(&schema_));
}
void Run() {
constexpr const char* const kTableName = "PerRangeHashSchemaVariations";
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
table_creator->table_name(kTableName)
.schema(&schema_)
.num_replicas(1)
.set_range_partition_columns({ kKeyColumn });
const PerRangeTestParameters& params = GetParam();
for (const auto& per_range_params : params.partition_params) {
auto p = CreateRangePartition(per_range_params.bounds.lower,
per_range_params.bounds.upper);
const auto& hash_params = per_range_params.hash_params;
for (const auto& hd : hash_params) {
ASSERT_OK(p->add_hash_partitions(hd.columns, hd.num_buckets, 0));
}
table_creator->add_custom_range_partition(p.release());
}
for (const auto& table_wide_params : params.table_wide_hash_schema_params) {
table_creator->add_hash_partitions(table_wide_params.columns,
table_wide_params.num_buckets);
}
if (params.allowed) {
ASSERT_OK(table_creator->Create());
} else {
const auto s = table_creator->Create();
ASSERT_TRUE(s.IsNotSupported()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(),
"varying number of hash dimensions per range is not yet supported");
}
}
};
const vector<PerRangeTestParameters> kPerRangeTestParameters = {
// A single range with no hashing at all.
{
true,
{
{ { INT32_MIN, 0 }, {} },
},
{} // empty table-wide hash schema
},
// A single range with minimal hash schema.
{
true,
{
{ { INT32_MIN, 0 }, { { { kKeyColumn }, 2, } } },
},
{
{ { kKeyColumn }, 3 },
}
},
// A single range with hashing on both columns of the primary key.
{
true,
{
{ { INT32_MIN, 0 }, { { { kKeyColumn, kIntValColumn }, 2, } } },
},
{
{ { kKeyColumn }, 5 },
}
},
// Varying the number of hash buckets.
{
true,
{
{ { 0, 100 }, { { { kKeyColumn }, 2, } } },
{ { 100, 200 }, { { { kKeyColumn }, 3, } } },
{ { 200, 300 }, { { { kKeyColumn }, 2, } } },
{ { 300, 400 }, { { { kKeyColumn }, 10, } } },
},
{
{ { kIntValColumn }, 5 },
}
},
// Varying the set of columns for hash bucketing.
{
true,
{
{ { 100, 200 }, { { { kKeyColumn }, 2, } } },
{ { 200, 300 }, { { { kKeyColumn, kIntValColumn }, 2, } } },
{ { 300, 400 }, { { { kIntValColumn }, 2, } } },
{ { 400, 500 }, { { { kIntValColumn, kKeyColumn }, 2, } } },
},
{
{ { kIntValColumn }, 3 },
}
},
// Varying the set of columns for hash bucketing and number of buckets.
{
true,
{
{ { 200, 300 }, { { { kKeyColumn }, 10, } } },
{ { 300, 400 }, { { { kKeyColumn, kIntValColumn }, 2, } } },
{ { 400, 500 }, { { { kKeyColumn, kIntValColumn }, 5, } } },
{ { 500, 600 }, { { { kIntValColumn }, 8, } } },
{ { 600, 700 }, { { { kKeyColumn }, 3, } } },
{ { 700, 800 }, { { { kIntValColumn, kKeyColumn }, 16, } } },
{ { 800, 900 }, { { { kIntValColumn }, 32, } } },
},
{
{ { kIntValColumn }, 7 },
}
},
// Below are multiple scenarios with varying number of hash dimensions
// for hash bucketing.
{
false,
{
{ { INT32_MIN, 0 }, {} },
{ { 0, 100 }, { { { kKeyColumn }, 2 } } },
},
{} // an empty table-wide schema
},
{
false,
{
{ { INT32_MIN, 0 }, { { { kKeyColumn }, 3 } } },
{ { 0, 100 }, {} },
},
{
{ { kKeyColumn }, 8 },
}
},
{
false,
{
{ { INT32_MIN, 0 }, {} },
{ { 0, 100 }, { { { kKeyColumn }, 2 } } },
{ { 100, 200 }, {} },
},
{} // an empty table-wide schema
},
{
false,
{
{ { INT32_MIN, 0 }, { { { kKeyColumn }, 3 } } },
{ { 0, 100 }, {} },
{ { 100, 200 }, { { { kIntValColumn }, 2 } } },
},
{} // an empty table-wide schema
},
{
false,
{
{ { 0, 100 }, { { { kKeyColumn }, 2 }, { { kIntValColumn }, 2 } } },
{ { 100, 200 }, { { { kKeyColumn, kIntValColumn }, 2 } } },
},
{
{ { kKeyColumn }, 7 },
{ { kIntValColumn }, 3 },
}
},
{
false,
{
{ { 100, 200 }, { { { kKeyColumn }, 2 }, { { kIntValColumn }, 3 } } },
{ { 200, 300 }, { { { kKeyColumn }, 3 } } },
},
{
{ { kKeyColumn }, 7 },
}
},
{
false,
{
{ { 300, 400 }, { { { kKeyColumn }, 2 }, { { kIntValColumn }, 3 } } },
{ { 400, 500 }, { { { kKeyColumn }, 3 } } },
},
{
{ { kKeyColumn }, 7 },
}
},
{
false,
{
{ { 500, 600 }, { { { kKeyColumn }, 10 }, { { kIntValColumn }, 5 } } },
{ { 600, 700 }, { { { kKeyColumn }, 2 } } },
{ { 700, INT32_MAX }, {} },
},
{} // an empty table-wide schema
},
};
INSTANTIATE_TEST_SUITE_P(Variations, FlexPartitioningCreateTableParamTest,
testing::ValuesIn(kPerRangeTestParameters));
TEST_P(FlexPartitioningCreateTableParamTest, PerRangeHashSchemaVariations) {
NO_FATALS(Run());
}
TEST_F(FlexPartitioningCreateTableTest, CustomHashSchemasVaryingDimensionsNumber) {
constexpr const char* const kTableName = "CustomHashSchemasVaryingDimensions";
// Try creating a table with the following partitions:
//
// hash bucket
// key 0 1
// -------------------------
// <111 x:{key} x:{key}
// 111-222 - -
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
table_creator->table_name(kTableName)
.schema(&schema_)
.num_replicas(1)
.add_hash_partitions({ kKeyColumn }, 2)
.set_range_partition_columns({ kKeyColumn });
// Add a range partition with the table-wide hash partitioning rules.
{
unique_ptr<KuduPartialRow> lower(schema_.NewRow());
ASSERT_OK(lower->SetInt32(kKeyColumn, INT32_MIN));
unique_ptr<KuduPartialRow> upper(schema_.NewRow());
ASSERT_OK(upper->SetInt32(kKeyColumn, 111));
table_creator->add_range_partition(lower.release(), upper.release());
}
// Add a range partition with no hash bucketing. Not calling
// KuduRangePartition::add_hash_partitions() on the newly created range
// means the range doesn't have any hash bucketing.
{
auto p = CreateRangePartition(111, 222);
table_creator->add_custom_range_partition(p.release());
}
const auto s = table_creator->Create();
ASSERT_TRUE(s.IsNotSupported()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(),
"varying number of hash dimensions per range is not yet supported");
}
// This test scenario creates a table with a range partition having no upper
// bound. The range partition has a custom empty hash schema (i.e. no hash
// bucketing for the range) in the presence of non-empty table-wide hash schema.
//
// TODO(aserbin): re-enable this scenario once varying hash dimensions per range
// are supported
TEST_F(FlexPartitioningCreateTableTest, DISABLED_NoUpperBoundRangeCustomHashSchema) {
// Create a table with the following partitions:
//
// hash bucket
// key 0 1 2
// --------------------------------
// 0-111 x:{key} x:{key} x:{key}
// 111-222 x:{key} x:{key} -
// >=222 - - -
constexpr const char* const kTableName = "NoUpperBoundRangeCustomHashSchema";
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
table_creator->table_name(kTableName)
.schema(&schema_)
.num_replicas(1)
.add_hash_partitions({ kKeyColumn }, 3)
.set_range_partition_columns({ kKeyColumn });
// Add a range partition with the table-wide hash partitioning rules.
{
unique_ptr<KuduPartialRow> lower(schema_.NewRow());
ASSERT_OK(lower->SetInt32(kKeyColumn, 0));
unique_ptr<KuduPartialRow> upper(schema_.NewRow());
ASSERT_OK(upper->SetInt32(kKeyColumn, 111));
table_creator->add_range_partition(lower.release(), upper.release());
}
// Add a range partition with custom hash sub-partitioning rules:
// 2 buckets with hash based on the "key" column with hash seed 1.
{
auto p = CreateRangePartition(111, 222);
ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 2, 0));
table_creator->add_custom_range_partition(p.release());
}
// Add unbounded range partition with no hash bucketing.
{
auto p = CreateRangePartitionNoUpperBound(222);
table_creator->add_custom_range_partition(p.release());
}
ASSERT_OK(table_creator->Create());
NO_FATALS(CheckTabletCount(kTableName, 6));
// Make sure it's possible to insert rows into the table for all the existing
// partitions.
ASSERT_OK(InsertTestRows(kTableName, 0, 555));
NO_FATALS(CheckTableRowsNum(kTableName, 555));
}
// Create a table unbounded ranges with custom hash schemas.
TEST_F(FlexPartitioningCreateTableTest, UnboundedRangesWithCustomHashSchemas) {
constexpr const char* const kTableName =
"UnboundedRangesWithCustomHashSchemas";
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
table_creator->table_name(kTableName)
.schema(&schema_)
.num_replicas(1)
.add_hash_partitions({ kKeyColumn }, 2)
.set_range_partition_columns({ kKeyColumn });
// Add a range partition with custom hash sub-partitioning rules:
// 3 buckets with hash based on the "key" column with hash seed 1.
{
auto p = CreateRangePartitionNoLowerBound(0);
ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 3, 1));
table_creator->add_custom_range_partition(p.release());
}
// Add a range partition with custom hash sub-partitioning rules:
// 5 buckets with hash based on the "key" column with hash seed 2.
{
auto p = CreateRangePartitionNoUpperBound(0);
ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 5, 2));
table_creator->add_custom_range_partition(p.release());
}
ASSERT_OK(table_creator->Create());
NO_FATALS(CheckTabletCount(kTableName, 8));
ASSERT_OK(InsertTestRows(kTableName, -250, -125));
NO_FATALS(CheckTableRowsNum(kTableName, 125));
ASSERT_OK(InsertTestRows(kTableName, 125, 250));
NO_FATALS(CheckTableRowsNum(kTableName, 250));
ASSERT_OK(InsertTestRows(kTableName, -125, 125));
NO_FATALS(CheckTableRowsNum(kTableName, 500));
}
// Similar to the UnboundedRangesWithCustomHashSchemas above, but with
// additional range having table-wide table schema.
TEST_F(FlexPartitioningCreateTableTest,
TwoUnboundedRangesWithCustomHashSchemasAndOneInBetween) {
constexpr const char* const kTableName =
"TwoUnboundedRangesWithCustomHashSchemasAndOneInBetween";
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
table_creator->table_name(kTableName)
.schema(&schema_)
.num_replicas(1)
.add_hash_partitions({ kKeyColumn }, 3)
.set_range_partition_columns({ kKeyColumn });
// Add a range partition with the table-wide hash partitioning rules.
{
unique_ptr<KuduPartialRow> lower(schema_.NewRow());
ASSERT_OK(lower->SetInt32(kKeyColumn, -111));
unique_ptr<KuduPartialRow> upper(schema_.NewRow());
ASSERT_OK(upper->SetInt32(kKeyColumn, 111));
table_creator->add_range_partition(lower.release(), upper.release());
}
// Add a range partition with custom hash sub-partitioning rules:
// 2 buckets with hash based on the "key" column with hash seed 1.
{
auto p = CreateRangePartitionNoLowerBound(-111);
ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 2, 1));
table_creator->add_custom_range_partition(p.release());
}
{
auto p = CreateRangePartitionNoUpperBound(111);
ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 5, 2));
table_creator->add_custom_range_partition(p.release());
}
ASSERT_OK(table_creator->Create());
NO_FATALS(CheckTabletCount(kTableName, 10)); // 3 + 2 + 5 = 10
// Make sure it's possible to insert rows into the table for all the existing
// partitions.
ASSERT_OK(InsertTestRows(kTableName, -500, 500));
NO_FATALS(CheckTableRowsNum(kTableName, 1000));
}
TEST_F(FlexPartitioningCreateTableTest, ScansWithRangePredicates) {
constexpr const char* const kTableName = "ScansWithRangePredicates";
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
table_creator->table_name(kTableName)
.schema(&schema_)
.num_replicas(1)
.add_hash_partitions({ kKeyColumn }, 2)
.set_range_partition_columns({ kKeyColumn });
// Add a range partition with the table-wide hash partitioning rules.
{
unique_ptr<KuduPartialRow> lower(schema_.NewRow());
ASSERT_OK(lower->SetInt32(kKeyColumn, 0));
unique_ptr<KuduPartialRow> upper(schema_.NewRow());
ASSERT_OK(upper->SetInt32(kKeyColumn, 111));
table_creator->add_range_partition(lower.release(), upper.release());
}
// Add a range partition with custom hash sub-partitioning rules:
// 4 buckets with hash based on the "key" column with hash seed 1.
{
auto p = CreateRangePartition(111, 222);
ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 4, 1));
table_creator->add_custom_range_partition(p.release());
}
// Add a range partition with custom hash sub-partitioning rules:
// 3 buckets with hash based on the "key" column with hash seed 1.
{
auto p = CreateRangePartition(222, 333);
ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 3, 1));
table_creator->add_custom_range_partition(p.release());
}
ASSERT_OK(table_creator->Create());
NO_FATALS(CheckTabletCount(kTableName, 9));
ASSERT_OK(InsertTestRows(kTableName, 0, 111));
NO_FATALS(CheckTableRowsNum(kTableName, 111));
NO_FATALS(CheckTabletsScanned(kTableName, 9));
ASSERT_OK(InsertTestRows(kTableName, 111, 222));
NO_FATALS(CheckTableRowsNum(kTableName, 222));
NO_FATALS(CheckTabletsScanned(kTableName, 9));
ASSERT_OK(InsertTestRows(kTableName, 222, 333));
NO_FATALS(CheckTableRowsNum(kTableName, 333));
NO_FATALS(CheckTabletsScanned(kTableName, 9));
// Filler values for lower or upper bound when it's not meant to be set.
constexpr int lower_filler = INT8_MIN;
constexpr int upper_filler = INT8_MAX;
CheckScanWithColumnPredicate(kTableName, "key", 150, 6, 0, 150);
CheckScanWithColumnPredicate(kTableName, "key", 183, 7, 150, 333);
CheckScanWithColumnPredicate(kTableName, "key", 75, 2, 0, 75);
CheckScanWithColumnPredicate(kTableName, "key", 175, 9, 75, 250);
CheckScanWithColumnPredicate(kTableName, "key", 83, 3, 250, 333);
CheckScanWithColumnPredicate(kTableName, "key", 1, 1, 0, 1);
CheckScanWithColumnPredicate(kTableName, "key", 2, 6, 110, 112);
CheckScanWithColumnPredicate(kTableName, "key", 0, 0, -10, -5);
CheckScanWithColumnPredicate(kTableName, "key", 0, 0, 350, 400);
CheckScanWithColumnPredicate(kTableName, "key", 1, 1, 332, 333);
CheckScanWithColumnPredicate(kTableName, "key", 0, 0, 333, 334);
CheckScanWithColumnPredicate(kTableName, "key", 100, 2, lower_filler, 100);
CheckScanWithColumnPredicate(kTableName, "key", 250, 9, lower_filler, 250);
CheckScanWithColumnPredicate(kTableName, "key", 333, 9, lower_filler, 333);
CheckScanWithColumnPredicate(kTableName, "key", 333, 9, lower_filler, 334);
CheckScanWithColumnPredicate(kTableName, "key", 333, 9, -10, upper_filler);
CheckScanWithColumnPredicate(kTableName, "key", 233, 9, 100, upper_filler);
CheckScanWithColumnPredicate(kTableName, "key", 133, 7, 200, upper_filler);
CheckScanWithColumnPredicate(kTableName, "key", 83, 3, 250, upper_filler);
CheckScanWithColumnPredicate(kTableName, "key", 0, 0, 333, upper_filler);
// Meanwhile, inserting into non-covered ranges should result in a proper
// error status return to the client attempting such an operation.
{
constexpr int kNumRows = 10;
vector<KuduError*> errors;
ElementDeleter drop(&errors);
auto s = InsertTestRows(
kTableName, 334, 334 + kNumRows, KuduSession::MANUAL_FLUSH, &errors);
ASSERT_TRUE(s.IsIOError()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), "failed to flush data");
ASSERT_EQ(kNumRows, errors.size());
for (const auto& e : errors) {
const auto& err = e->status();
EXPECT_TRUE(err.IsNotFound()) << err.ToString();
ASSERT_STR_CONTAINS(err.ToString(),
"No tablet covering the requested range partition");
}
}
}
TEST_F(FlexPartitioningCreateTableTest, ScansWithRangePredicatesWithSameHashBuckets) {
constexpr const char* const kTableName = "ScansWithRangePredicatesWithSameHashBuckets";
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
table_creator->table_name(kTableName)
.schema(&schema_)
.num_replicas(1)
.add_hash_partitions({ kKeyColumn }, 3)
.set_range_partition_columns({ kKeyColumn });
// Add a range partition with the table-wide hash partitioning rules.
{
unique_ptr<KuduPartialRow> lower(schema_.NewRow());
ASSERT_OK(lower->SetInt32(kKeyColumn, 0));
unique_ptr<KuduPartialRow> upper(schema_.NewRow());
ASSERT_OK(upper->SetInt32(kKeyColumn, 111));
table_creator->add_range_partition(lower.release(), upper.release());
}
// Add a range partition with the table-wide hash partitioning rules.
{
unique_ptr<KuduPartialRow> lower(schema_.NewRow());
ASSERT_OK(lower->SetInt32(kKeyColumn, 111));
unique_ptr<KuduPartialRow> upper(schema_.NewRow());
ASSERT_OK(upper->SetInt32(kKeyColumn, 222));
table_creator->add_range_partition(lower.release(), upper.release());
}
// Add a range partition with the table-wide hash partitioning rules.
{
unique_ptr<KuduPartialRow> lower(schema_.NewRow());
ASSERT_OK(lower->SetInt32(kKeyColumn, 222));
unique_ptr<KuduPartialRow> upper(schema_.NewRow());
ASSERT_OK(upper->SetInt32(kKeyColumn, 333));
table_creator->add_range_partition(lower.release(), upper.release());
}
ASSERT_OK(table_creator->Create());
NO_FATALS(CheckTabletCount(kTableName, 9));
ASSERT_OK(InsertTestRows(kTableName, 0, 111));
NO_FATALS(CheckTableRowsNum(kTableName, 111));
NO_FATALS(CheckTabletsScanned(kTableName, 9));
ASSERT_OK(InsertTestRows(kTableName, 111, 222));
NO_FATALS(CheckTableRowsNum(kTableName, 222));
NO_FATALS(CheckTabletsScanned(kTableName, 9));
ASSERT_OK(InsertTestRows(kTableName, 222, 333));
NO_FATALS(CheckTableRowsNum(kTableName, 333));
NO_FATALS(CheckTabletsScanned(kTableName, 9));
// Filler values for lower or upper bound when it's not meant to be set.
constexpr int lower_filler = INT8_MIN;
constexpr int upper_filler = INT8_MAX;
// The test cases commented out below fail. Initially, it was thought to be a regression but these
// failures are not reproducible in a real cluster.
CheckScanWithColumnPredicate(kTableName, "key", 150, 6, 0, 150);
CheckScanWithColumnPredicate(kTableName, "key", 183, 6, 150, 333);
CheckScanWithColumnPredicate(kTableName, "key", 75, 3, 0, 75);
CheckScanWithColumnPredicate(kTableName, "key", 175, 9, 75, 250);
CheckScanWithColumnPredicate(kTableName, "key", 83, 3, 250, 333);
CheckScanWithColumnPredicate(kTableName, "key", 1, 1, 0, 1);
CheckScanWithColumnPredicate(kTableName, "key", 1, 1, 110, 111);
CheckScanWithColumnPredicate(kTableName, "key", 3, 6, 110, 113);
CheckScanWithColumnPredicate(kTableName, "key", 1, 1, 332, 333);
CheckScanWithColumnPredicate(kTableName, "key", 0, 0, 333, 334);
CheckScanWithColumnPredicate(kTableName, "key", 0, 0, -10, -5);
CheckScanWithColumnPredicate(kTableName, "key", 0, 0, 350, 400);
CheckScanWithColumnPredicate(kTableName, "key", 100, 3, lower_filler, 100);
CheckScanWithColumnPredicate(kTableName, "key", 250, 9, lower_filler, 250);
CheckScanWithColumnPredicate(kTableName, "key", 333, 9, lower_filler, 333);
CheckScanWithColumnPredicate(kTableName, "key", 333, 9, lower_filler, 334);
CheckScanWithColumnPredicate(kTableName, "key", 333, 9, -10, upper_filler);
CheckScanWithColumnPredicate(kTableName, "key", 233, 9, 100, upper_filler);
CheckScanWithColumnPredicate(kTableName, "key", 133, 6, 200, upper_filler);
CheckScanWithColumnPredicate(kTableName, "key", 83, 3, 250, upper_filler);
CheckScanWithColumnPredicate(kTableName, "key", 0, 0, 333, upper_filler);
// Meanwhile, inserting into non-covered ranges should result in a proper
// error status return to the client attempting such an operation.
{
constexpr int kNumRows = 10;
vector<KuduError*> errors;
ElementDeleter drop(&errors);
auto s = InsertTestRows(
kTableName, 334, 334 + kNumRows, KuduSession::MANUAL_FLUSH, &errors);
ASSERT_TRUE(s.IsIOError()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), "failed to flush data");
ASSERT_EQ(kNumRows, errors.size());
for (const auto& e : errors) {
const auto& err = e->status();
EXPECT_TRUE(err.IsNotFound()) << err.ToString();
ASSERT_STR_CONTAINS(err.ToString(),
"No tablet covering the requested range partition");
}
}
}
TEST_F(FlexPartitioningCreateTableTest, ScansWithNonCoveringRanges) {
constexpr const char* const kTableName = "ScansWithNonCoveringRanges";
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
table_creator->table_name(kTableName)
.schema(&schema_)
.num_replicas(1)
.add_hash_partitions({ kKeyColumn }, 2)
.set_range_partition_columns({ kKeyColumn });
// Add a range partition with custom hash sub-partitioning rules:
// 4 buckets with hash based on the "key" column with hash seed 1.
{
auto p = CreateRangePartition(0, 111);
ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 4, 1));
table_creator->add_custom_range_partition(p.release());
}
// Add a range partition with the table-wide hash partitioning rules.
{
unique_ptr<KuduPartialRow> lower(schema_.NewRow());
ASSERT_OK(lower->SetInt32(kKeyColumn, 222));
unique_ptr<KuduPartialRow> upper(schema_.NewRow());
ASSERT_OK(upper->SetInt32(kKeyColumn, 333));
table_creator->add_range_partition(lower.release(), upper.release());
}
// Add a range partition with custom hash sub-partitioning rules:
// 3 buckets with hash based on the "key" column with hash seed 1.
{
auto p = CreateRangePartition(444, 555);
ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 3, 1));
table_creator->add_custom_range_partition(p.release());
}
ASSERT_OK(table_creator->Create());
NO_FATALS(CheckTabletCount(kTableName, 9));
ASSERT_OK(InsertTestRows(kTableName, 0, 111));
NO_FATALS(CheckTableRowsNum(kTableName, 111));
NO_FATALS(CheckTabletsScanned(kTableName, 9));
ASSERT_OK(InsertTestRows(kTableName, 222, 333));
NO_FATALS(CheckTableRowsNum(kTableName, 222));
NO_FATALS(CheckTabletsScanned(kTableName, 9));
ASSERT_OK(InsertTestRows(kTableName, 444, 555));
NO_FATALS(CheckTableRowsNum(kTableName, 333));
NO_FATALS(CheckTabletsScanned(kTableName, 9));
// Filler values for lower or upper bound when it's not meant to be set.
constexpr int lower_filler = INT8_MIN;
constexpr int upper_filler = INT8_MAX;
CheckScanWithColumnPredicate(kTableName, "key", 111, 4, -10, 111);
CheckScanWithColumnPredicate(kTableName, "key", 111, 4, -10, 150);
CheckScanWithColumnPredicate(kTableName, "key", 111, 2, 150, 333);
CheckScanWithColumnPredicate(kTableName, "key", 33, 6, 100, 244);
CheckScanWithColumnPredicate(kTableName, "key", 111, 2, 144, 344);
CheckScanWithColumnPredicate(kTableName, "key", 333, 9, -50, 600);
CheckScanWithColumnPredicate(kTableName, "key", 222, 5, 150, 600);
CheckScanWithColumnPredicate(kTableName, "key", 1, 1, 0, 1);
CheckScanWithColumnPredicate(kTableName, "key", 0, 0, -10, -5);
CheckScanWithColumnPredicate(kTableName, "key", 0, 0, 600, 650);
CheckScanWithColumnPredicate(kTableName, "key", 1, 1, 554, 555);
CheckScanWithColumnPredicate(kTableName, "key", 0, 0, 555, 556);
CheckScanWithColumnPredicate(kTableName, "key", 111, 4, lower_filler, 150);
CheckScanWithColumnPredicate(kTableName, "key", 222, 6, lower_filler, 350);
CheckScanWithColumnPredicate(kTableName, "key", 333, 9, lower_filler, 560);
CheckScanWithColumnPredicate(kTableName, "key", 333, 9, -10, upper_filler);
CheckScanWithColumnPredicate(kTableName, "key", 222, 5, 150, upper_filler);
CheckScanWithColumnPredicate(kTableName, "key", 111, 3, 350, upper_filler);
}
// Negative tests scenarios to cover non-OK status codes for various operations
// related to custom hash bucket schema per range.
TEST_F(FlexPartitioningCreateTableTest, Negatives) {
// Try adding hash partitions on an empty set of columns.
{
auto p = CreateRangePartition();
const auto s = p->add_hash_partitions({}, 2, 0);
ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
ASSERT_STR_CONTAINS(
s.ToString(), "set of columns for hash partitioning must not be empty");
}
// Try adding hash partitions with just one bucket.
{
auto p = CreateRangePartition();
const auto s = p->add_hash_partitions({ kKeyColumn }, 1, 0);
ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
ASSERT_STR_CONTAINS(
s.ToString(),
"at least two buckets are required to establish hash partitioning");
}
// Try adding hash partition on a non-existent column: appropriate error
// surfaces during table creation.
{
RangePartitions partitions;
partitions.emplace_back(CreateRangePartition());
auto& p = partitions.back();
ASSERT_OK(p->add_hash_partitions({ "nonexistent" }, 2, 0));
const auto s = CreateTable("nicetry", std::move(partitions));
ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), "unknown column: name: \"nonexistent\"");
}
// Try adding creating a table where both range splits and custom hash bucket
// schema per partition are both specified -- that should not be possible.
{
RangePartition p(CreateRangePartition(0, 100));
ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 2, 0));
unique_ptr<KuduPartialRow> split(schema_.NewRow());
ASSERT_OK(split->SetInt32(kKeyColumn, 50));
unique_ptr<KuduTableCreator> creator(client_->NewTableCreator());
creator->table_name("nicetry")
.schema(&schema_)
.num_replicas(1)
.set_range_partition_columns({ kKeyColumn })
.add_range_partition_split(split.release())
.add_custom_range_partition(p.release());
const auto s = creator->Create();
ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
ASSERT_STR_CONTAINS(
s.ToString(),
"split rows and custom hash bucket schemas for ranges are incompatible: "
"choose one or the other");
}
// Same as the sub-scenario above, but using deprecated client API to specify
// so-called split rows.
{
RangePartition p(CreateRangePartition(0, 100));
ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 2, 0));
unique_ptr<KuduPartialRow> split_row(schema_.NewRow());
ASSERT_OK(split_row->SetInt32(kKeyColumn, 50));
unique_ptr<KuduTableCreator> creator(client_->NewTableCreator());
creator->table_name("nicetry")
.schema(&schema_)
.num_replicas(1)
.set_range_partition_columns({ kKeyColumn })
.add_custom_range_partition(p.release());
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
creator->split_rows({ split_row.release() });
#pragma GCC diagnostic pop
const auto s = creator->Create();
ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
ASSERT_STR_CONTAINS(
s.ToString(),
"split rows and custom hash bucket schemas for ranges are incompatible: "
"choose one or the other");
}
{
constexpr const char* const kTableName = "3@key_x_3@key";
RangePartitions partitions;
partitions.emplace_back(CreateRangePartition());
auto& p = partitions.back();
ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 3, 0));
ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 3, 1));
const auto s = CreateTable(kTableName, std::move(partitions));
ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(),
"hash bucket schema components must not contain "
"columns in common");
}
{
constexpr const char* const kTableName = "3@key_x_3@key:int_val";
RangePartitions partitions;
partitions.emplace_back(CreateRangePartition());
auto& p = partitions.back();
ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 2, 0));
ASSERT_OK(p->add_hash_partitions({ kKeyColumn, kIntValColumn }, 3, 1));
const auto s = CreateTable(kTableName, std::move(partitions));
ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(),
"hash bucket schema components must not contain "
"columns in common");
}
{
constexpr const char* const kTableName = "3@int_val";
RangePartitions partitions;
partitions.emplace_back(CreateRangePartition());
auto& p = partitions.back();
ASSERT_OK(p->add_hash_partitions({ kIntValColumn }, 2, 0));
const auto s = CreateTable(kTableName, std::move(partitions));
ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(),
"must specify only primary key columns for hash "
"bucket partition components");
}
{
constexpr const char* const kTableName = "3@string_val";
RangePartitions partitions;
partitions.emplace_back(CreateRangePartition());
auto& p = partitions.back();
ASSERT_OK(p->add_hash_partitions({ kStringValColumn }, 5, 0));
const auto s = CreateTable(kTableName, std::move(partitions));
ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(),
"must specify only primary key columns for hash "
"bucket partition components");
}
{
constexpr const char* const kTableName = "2@key_x_3@int_val";
RangePartitions partitions;
partitions.emplace_back(CreateRangePartition());
auto& p = partitions.back();
ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 2, 0));
ASSERT_OK(p->add_hash_partitions({ kIntValColumn }, 3, 1));
const auto s = CreateTable(kTableName, std::move(partitions));
ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(),
"must specify only primary key columns for hash "
"bucket partition components");
}
}
// This test scenario verifies that ListPartitions() returns correct number
// of partitions for tables with unbounded ranges with custom hash schemas.
// Essentially, this scenario makes sure the logic used by the code to iterate
// over range partitions by employing PartitionPruner's NextPartitionKey() and
// RemovePartitionKeyRange() methods works as expected.
TEST_F(FlexPartitioningCreateTableTest, UnboundedRangesOneDimensionHash) {
constexpr const char* const kTableName = "UnboundedRangesOneDimensionHash";
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
table_creator->table_name(kTableName)
.schema(&schema_)
.num_replicas(1)
.add_hash_partitions({ kKeyColumn }, 2)
.set_range_partition_columns({ kKeyColumn });
// Add a range partition with the table-wide hash schema.
{
unique_ptr<KuduPartialRow> lower(schema_.NewRow());
unique_ptr<KuduPartialRow> upper(schema_.NewRow());
ASSERT_OK(upper->SetInt32(kKeyColumn, -10));
table_creator->add_range_partition(lower.release(), upper.release());
}
// Add two range partitions with custom hash schemas.
{
auto p = CreateRangePartition(-10, 10);
ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 5, 1));
table_creator->add_custom_range_partition(p.release());
}
{
auto p = CreateRangePartitionNoUpperBound(10);
ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 3, 2));
table_creator->add_custom_range_partition(p.release());
}
ASSERT_OK(table_creator->Create());
NO_FATALS(CheckTabletCount(kTableName, 10)); // 2 + 5 + 3 = 10
shared_ptr<KuduTable> table;
ASSERT_OK(client_->OpenTable(kTableName, &table));
vector<Partition> partitions;
ASSERT_OK(table->ListPartitions(&partitions));
ASSERT_EQ(10, partitions.size());
// Make sure it's possible to insert rows into the table for all the existing
// partitions.
ASSERT_OK(InsertTestRows(kTableName, -50, 50));
NO_FATALS(CheckTableRowsNum(kTableName, 100));
}
// Similar to FlexPartitioningCreateTableTest.UnboundedRangesOneDimensionHash
// abobe, but with two hash dimensions.
TEST_F(FlexPartitioningCreateTableTest, UnboundedRangesTwoDimensionHash) {
constexpr const char* const kTableName = "UnboundedRangesTwoDimensionHash";
constexpr const char* const kC0 = "c0";
constexpr const char* const kC1 = "c1";
constexpr const char* const kC2 = "c2";
KuduSchemaBuilder b;
b.AddColumn(kC0)->Type(KuduColumnSchema::INT32)->NotNull();
b.AddColumn(kC1)->Type(KuduColumnSchema::INT32)->NotNull();
b.AddColumn(kC2)->Type(KuduColumnSchema::STRING)->Nullable();
b.SetPrimaryKey({ kC0, kC1 });
KuduSchema schema;
ASSERT_OK(b.Build(&schema));
auto rows_inserter = [&](int32_t key_beg, int32_t key_end) {
vector<KuduError*> errors;
CHECK_LE(key_beg, key_end);
shared_ptr<KuduTable> table;
RETURN_NOT_OK(client_->OpenTable(kTableName, &table));
shared_ptr<KuduSession> session = client_->NewSession();
RETURN_NOT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
session->SetTimeoutMillis(60000);
for (int32_t key_val = key_beg; key_val < key_end; ++key_val) {
unique_ptr<KuduInsert> insert(table->NewInsert());
RETURN_NOT_OK(insert->mutable_row()->SetInt32(kC0, key_val));
RETURN_NOT_OK(insert->mutable_row()->SetInt32(kC1, key_val));
RETURN_NOT_OK(insert->mutable_row()->SetStringCopy(kC2, std::to_string(rand())));
RETURN_NOT_OK(session->Apply(insert.release()));
}
return session->Flush();
};
// Two range partitions: [-inf, 0) [0, +inf)
{
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
table_creator->table_name(kTableName)
.schema(&schema)
.num_replicas(1)
.add_hash_partitions({ kC0 }, 3)
.add_hash_partitions({ kC1 }, 3)
.set_range_partition_columns({ kC0 });
// Add a range partition with the table-wide hash schema.
{
unique_ptr<KuduPartialRow> lower(schema.NewRow());
unique_ptr<KuduPartialRow> upper(schema.NewRow());
ASSERT_OK(upper->SetInt32(kC0, 0));
table_creator->add_range_partition(lower.release(), upper.release());
}
// Add a range partition with custom hash schema.
{
unique_ptr<KuduPartialRow> lower(schema.NewRow());
ASSERT_OK(lower->SetInt32(kC0, 0));
unique_ptr<KuduRangePartition> p(
new KuduRangePartition(lower.release(), schema.NewRow()));
ASSERT_OK(p->add_hash_partitions({ kC0 }, 2, 0));
ASSERT_OK(p->add_hash_partitions({ kC1 }, 2, 1));
table_creator->add_custom_range_partition(p.release());
}
ASSERT_OK(table_creator->Create());
NO_FATALS(CheckTabletCount(kTableName, 13));
shared_ptr<KuduTable> table;
ASSERT_OK(client_->OpenTable(kTableName, &table));
vector<Partition> partitions;
ASSERT_OK(table->ListPartitions(&partitions));
ASSERT_EQ(13, partitions.size());
ASSERT_OK(rows_inserter(-100, 100));
NO_FATALS(CheckTableRowsNum(kTableName, 200));
ASSERT_OK(client_->DeleteTable(kTableName));
}
// Three range partitions: [-inf, -10) [-10, 10) [10, +inf)
{
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
table_creator->table_name(kTableName)
.schema(&schema)
.num_replicas(1)
.add_hash_partitions({ kC0 }, 2)
.add_hash_partitions({ kC1 }, 2)
.set_range_partition_columns({ kC0 });
// Add a range partition with the table-wide hash schema.
{
unique_ptr<KuduPartialRow> lower(schema.NewRow());
unique_ptr<KuduPartialRow> upper(schema.NewRow());
ASSERT_OK(upper->SetInt32(kC0, -10));
table_creator->add_range_partition(lower.release(), upper.release());
}
// Add two range partitions with custom hash schemas.
{
auto p = CreateRangePartition(schema, kC0, -10, 10);
ASSERT_OK(p->add_hash_partitions({ kC0 }, 4, 0));
ASSERT_OK(p->add_hash_partitions({ kC1 }, 5, 1));
table_creator->add_custom_range_partition(p.release());
}
{
unique_ptr<KuduPartialRow> lower(schema.NewRow());
ASSERT_OK(lower->SetInt32(kC0, 10));
unique_ptr<KuduRangePartition> p(
new KuduRangePartition(lower.release(), schema.NewRow()));
ASSERT_OK(p->add_hash_partitions({ kC0 }, 3, 2));
ASSERT_OK(p->add_hash_partitions({ kC1 }, 4, 3));
table_creator->add_custom_range_partition(p.release());
}
ASSERT_OK(table_creator->Create());
NO_FATALS(CheckTabletCount(kTableName, 36)); // 4 + 20 + 12 = 36
shared_ptr<KuduTable> table;
ASSERT_OK(client_->OpenTable(kTableName, &table));
vector<Partition> partitions;
ASSERT_OK(table->ListPartitions(&partitions));
ASSERT_EQ(36, partitions.size());
ASSERT_OK(rows_inserter(-250, 250));
NO_FATALS(CheckTableRowsNum(kTableName, 500));
ASSERT_OK(client_->DeleteTable(kTableName));
}
}
// When working with a cluster that doesn't support range-specific hash schemas
// for tables, the client should receive proper error while trying to create
// a table with custom hash schema for at least one of its ranges.
TEST_F(FlexPartitioningCreateTableTest, UnsupportedRangeSpecificHashSchema) {
// Turn off the support for range-specific hash schemas, emulating the
// situation when a Kudu cluster is running an older version released prior
// to the introduction of the feature.
FLAGS_enable_per_range_hash_schemas = false;
constexpr const char* const kTableName =
"UnsupportedRangeSpecificHashSchemaCreateTable";
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
table_creator->table_name(kTableName)
.schema(&schema_)
.num_replicas(1)
.add_hash_partitions({ kKeyColumn }, 3)
.set_range_partition_columns({ kKeyColumn });
// Add a range partition with the table-wide hash partitioning rules.
unique_ptr<KuduPartialRow> lower(schema_.NewRow());
ASSERT_OK(lower->SetInt32(kKeyColumn, 0));
unique_ptr<KuduPartialRow> upper(schema_.NewRow());
ASSERT_OK(upper->SetInt32(kKeyColumn, 111));
table_creator->add_range_partition(lower.release(), upper.release());
// Add a range partition with custom hash schema.
auto p = CreateRangePartition(111, 222);
ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 5, 0));
table_creator->add_custom_range_partition(p.release());
const auto s = table_creator->Create();
ASSERT_TRUE(s.IsNotSupported()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), "cluster does not support CreateTable with "
"feature(s) RANGE_SPECIFIC_HASH_SCHEMA");
}
// Test for scenarios covering range partitioning with custom hash schemas
// specified when adding a new custom hash schema partition to a table.
class FlexPartitioningAlterTableTest : public FlexPartitioningTest {};
TEST_F(FlexPartitioningAlterTableTest, AddAndDropCustomRangePartition) {
// Create a table with the following partitions:
//
// hash bucket
// key 0 1
// -------------------------
// <111 x:{key} x:{key}
//
// Later add the following partition:
// hash bucket
// key 0 1 2
// --------------------------------
// 111-222 x:{key} x:{key} x:{key}
constexpr const char *const kTableName = "AddAndDropCustomRangePartition";
// Create a table with a single Range partition.
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
unique_ptr<KuduPartialRow> lower(schema_.NewRow());
ASSERT_OK(lower->SetInt32(kKeyColumn, INT32_MIN));
unique_ptr<KuduPartialRow> upper(schema_.NewRow());
ASSERT_OK(upper->SetInt32(kKeyColumn, 111));
table_creator->table_name(kTableName)
.schema(&schema_)
.num_replicas(1)
.add_hash_partitions({ kKeyColumn }, 2)
.set_range_partition_columns({ kKeyColumn })
.add_range_partition(lower.release(), upper.release());
ASSERT_OK(table_creator->Create());
NO_FATALS(CheckTabletCount(kTableName, 2));
// Add a range partition with custom hash sub-partitioning rules:
// 3 buckets with hash based on the "key" column with hash seed 1.
{
unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
auto p = CreateRangePartition(111, 222);
ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 3, 1));
table_alterer->AddRangePartition(p.release());
ASSERT_OK(table_alterer->Alter());
}
NO_FATALS(CheckTabletCount(kTableName, 5));
// Try adding a range partition with custom hash sub-partitioning rules:
// 3 buckets with hash based on the "key" column with hash seed 1.
// 2 buckets with hash based on the "key" column with hash seed 1.
{
unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
auto p = CreateRangePartition(222, 333);
ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 3, 1));
ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 2, 1));
table_alterer->AddRangePartition(p.release());
const auto s = table_alterer->Alter();
ASSERT_TRUE(s.IsNotSupported()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(),
"varying number of hash dimensions per range is not yet supported");
}
// Drop the successfully created partition.
{
unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
unique_ptr<KuduPartialRow> lower_drop(schema_.NewRow());
ASSERT_OK(lower_drop->SetInt32(kKeyColumn, 111));
unique_ptr<KuduPartialRow> upper_drop(schema_.NewRow());
ASSERT_OK(upper_drop->SetInt32(kKeyColumn, 222));
table_alterer->DropRangePartition(lower_drop.release(), upper_drop.release());
ASSERT_OK(table_alterer->Alter());
}
NO_FATALS(CheckTabletCount(kTableName, 2));
}
TEST_F(FlexPartitioningAlterTableTest, ReadAndWriteToCustomRangePartition) {
// Create a table with the following partitions:
//
// hash bucket
// key 0 1 2 3
// -----------------------------------------------------------
// <111 x:{key} x:{key} - -
// Later add the following partitions:
// hash bucket
// key 0 1 2 3
// -----------------------------------------------------------
// 111-222 x:{key} x:{key} x:{key} -
// 222-333 x:{key} x:{key} x:{key} x:{key}
// 333-444 x:{key} x:{key} - -
constexpr const char* const kTableName = "ReadAndWriteToCustomRangePartition";
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
unique_ptr<KuduPartialRow> lower(schema_.NewRow());
ASSERT_OK(lower->SetInt32(kKeyColumn, INT32_MIN));
unique_ptr<KuduPartialRow> upper(schema_.NewRow());
ASSERT_OK(upper->SetInt32(kKeyColumn, 111));
table_creator->table_name(kTableName)
.schema(&schema_)
.num_replicas(1)
.add_hash_partitions({ kKeyColumn }, 2)
.set_range_partition_columns({ kKeyColumn })
.add_range_partition(lower.release(), upper.release());
ASSERT_OK(table_creator->Create());
unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
// Add a range partition with custom hash sub-partitioning rules:
// 3 buckets with hash based on the "key" column with hash seed 1.
{
auto p = CreateRangePartition(111, 222);
ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 3, 1));
table_alterer->AddRangePartition(p.release());
}
// Add a range partition with custom hash sub-partitioning rules:
// 4 buckets with hash based on the "key" column with hash seed 2.
{
auto p = CreateRangePartition(222, 333);
ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 4, 2));
table_alterer->AddRangePartition(p.release());
}
// Add a range partition with custom hash sub-partitioning rules:
// 2 buckets hashing on the "key" column with hash seed 3.
{
auto p = CreateRangePartition(333, 444);
ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 2, 3));
table_alterer->AddRangePartition(p.release());
}
ASSERT_OK(table_alterer->Alter());
NO_FATALS(CheckTabletCount(kTableName, 11));
// Make sure it's possible to insert rows into the table for all the existing
// the partitions: first check the range of table-wide schema, then check
// the ranges with custom hash schemas.
ASSERT_OK(InsertTestRows(kTableName, 0, 111));
NO_FATALS(CheckTableRowsNum(kTableName, 111));
ASSERT_OK(InsertTestRows(kTableName, 111, 444));
NO_FATALS(CheckTableRowsNum(kTableName, 444));
NO_FATALS(CheckTableRowsNum(kTableName, kKeyColumn, 111, 222, 111));
NO_FATALS(CheckTableRowsNum(kTableName, kKeyColumn, 0, 444, 444));
// Drop a partition and re-scan
{
unique_ptr<KuduTableAlterer> table_alterer_drop(client_->NewTableAlterer(kTableName));
unique_ptr<KuduPartialRow> lower_drop(schema_.NewRow());
ASSERT_OK(lower_drop->SetInt32(kKeyColumn, 111));
unique_ptr<KuduPartialRow> upper_drop(schema_.NewRow());
ASSERT_OK(upper_drop->SetInt32(kKeyColumn, 222));
table_alterer_drop->DropRangePartition(lower_drop.release(), upper_drop.release());
ASSERT_OK(table_alterer_drop->Alter());
}
NO_FATALS(CheckTableRowsNum(kTableName, kKeyColumn, 0, 444, 333));
// Meanwhile, inserting into non-covered ranges should result in a proper
// error status return to the client attempting such an operation.
{
constexpr int kNumRows = 10;
vector<KuduError*> errors;
ElementDeleter drop(&errors);
auto s = InsertTestRows(
kTableName, 445, 445 + kNumRows, KuduSession::MANUAL_FLUSH, &errors);
ASSERT_TRUE(s.IsIOError()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), "failed to flush data");
ASSERT_EQ(kNumRows, errors.size());
for (const auto& e : errors) {
const auto& err = e->status();
EXPECT_TRUE(err.IsNotFound()) << err.ToString();
ASSERT_STR_CONTAINS(err.ToString(),
"No tablet covering the requested range partition");
}
}
}
TEST_F(FlexPartitioningAlterTableTest, ReadAndWriteToUnboundedCustomRangePartition) {
constexpr const char* const kTableName =
"ReadAndWriteToUnboundedCustomRangePartition";
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
unique_ptr<KuduPartialRow> lower(schema_.NewRow());
ASSERT_OK(lower->SetInt32(kKeyColumn, -100));
unique_ptr<KuduPartialRow> upper(schema_.NewRow());
ASSERT_OK(upper->SetInt32(kKeyColumn, 100));
table_creator->table_name(kTableName)
.schema(&schema_)
.num_replicas(1)
.add_hash_partitions({ kKeyColumn }, 2)
.set_range_partition_columns({ kKeyColumn })
.add_range_partition(lower.release(), upper.release());
ASSERT_OK(table_creator->Create());
unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
{
auto p = CreateRangePartitionNoLowerBound(-100);
ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 3, 1));
table_alterer->AddRangePartition(p.release());
}
{
auto p = CreateRangePartitionNoUpperBound(100);
ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 4, 2));
table_alterer->AddRangePartition(p.release());
}
ASSERT_OK(table_alterer->Alter());
NO_FATALS(CheckTabletCount(kTableName, 9)); // 2 + 3 + 4 = 9
// Make sure it's possible to insert rows into the table for all the existing
// the partitions: first check the range of table-wide schema, then check
// the ranges with custom hash schemas.
ASSERT_OK(InsertTestRows(kTableName, -100, 100));
NO_FATALS(CheckTableRowsNum(kTableName, 200));
ASSERT_OK(InsertTestRows(kTableName, -200, -100));
NO_FATALS(CheckTableRowsNum(kTableName, 300));
ASSERT_OK(InsertTestRows(kTableName, 100, 200));
NO_FATALS(CheckTableRowsNum(kTableName, 400));
ASSERT_OK(InsertTestRows(kTableName, INT32_MIN, INT32_MIN + 100));
NO_FATALS(CheckTableRowsNum(kTableName, 500));
ASSERT_OK(InsertTestRows(kTableName, INT32_MAX - 100, INT32_MAX));
NO_FATALS(CheckTableRowsNum(kTableName, 600));
NO_FATALS(CheckTableRowsNum(kTableName, kKeyColumn, -300, 300, 400));
// Drop a partition in the middle and re-scan with various ranges.
{
unique_ptr<KuduTableAlterer> table_alterer_drop(client_->NewTableAlterer(kTableName));
unique_ptr<KuduPartialRow> lower_drop(schema_.NewRow());
ASSERT_OK(lower_drop->SetInt32(kKeyColumn, -100));
unique_ptr<KuduPartialRow> upper_drop(schema_.NewRow());
ASSERT_OK(upper_drop->SetInt32(kKeyColumn, 100));
table_alterer_drop->DropRangePartition(lower_drop.release(), upper_drop.release());
ASSERT_OK(table_alterer_drop->Alter());
}
NO_FATALS(CheckTableRowsNum(kTableName, 400));
NO_FATALS(CheckTableRowsNum(kTableName, kKeyColumn, 0, 300, 100));
NO_FATALS(CheckTableRowsNum(kTableName, kKeyColumn, -300, 0, 100));
NO_FATALS(CheckTableRowsNum(kTableName, kKeyColumn, -200, 200, 200));
NO_FATALS(CheckTableRowsNum(kTableName, kKeyColumn, -500, 500, 200));
}
// When working with a cluster that doesn't support range-specific hash schemas
// for tables, the client should receive proper error while trying to add
// a range with custom hash schema.
TEST_F(FlexPartitioningAlterTableTest, UnsupportedRangeSpecificHashSchema) {
// Turn off the support for range-specific hash schemas, emulating the
// situation when a Kudu cluster is running an older version released prior
// to the introduction of the feature.
FLAGS_enable_per_range_hash_schemas = false;
constexpr const char* const kTableName =
"UnsupportedRangeSpecificHashSchemaAlterTable";
constexpr const char* const kErrMsg = "cluster does not support AlterTable "
"with feature(s) RANGE_SPECIFIC_HASH_SCHEMA";
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
table_creator->table_name(kTableName)
.schema(&schema_)
.num_replicas(1)
.add_hash_partitions({ kKeyColumn }, 2)
.set_range_partition_columns({ kKeyColumn });
// Add a range partition with the table-wide hash partitioning rules.
unique_ptr<KuduPartialRow> lower(schema_.NewRow());
ASSERT_OK(lower->SetInt32(kKeyColumn, 0));
unique_ptr<KuduPartialRow> upper(schema_.NewRow());
ASSERT_OK(upper->SetInt32(kKeyColumn, 111));
table_creator->add_range_partition(lower.release(), upper.release());
ASSERT_OK(table_creator->Create());
// Try to add a single range with custom hash schema.
{
unique_ptr<KuduTableAlterer> alterer(client_->NewTableAlterer(kTableName));
auto p = CreateRangePartition(111, 222);
ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 5, 0));
alterer->AddRangePartition(p.release());
const auto s = alterer->Alter();
ASSERT_TRUE(s.IsNotSupported()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), kErrMsg);
}
// Try to add a mix of ranges: with the table-wide and custom hash schemas.
{
unique_ptr<KuduTableAlterer> alterer(client_->NewTableAlterer(kTableName));
unique_ptr<KuduPartialRow> lower(schema_.NewRow());
ASSERT_OK(lower->SetInt32(kKeyColumn, 111));
unique_ptr<KuduPartialRow> upper(schema_.NewRow());
ASSERT_OK(upper->SetInt32(kKeyColumn, 222));
alterer->AddRangePartition(lower.release(), upper.release());
auto p = CreateRangePartition(222, 333);
ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 8, 0));
alterer->AddRangePartition(p.release());
const auto s = alterer->Alter();
ASSERT_TRUE(s.IsNotSupported()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), kErrMsg);
}
// Temporary enable support for the RANGE_SPECIFIC_HASH_SCHEMA feature to add
// a new range with custom hash schema to be dropped later.
FLAGS_enable_per_range_hash_schemas = true;
{
unique_ptr<KuduTableAlterer> alterer(client_->NewTableAlterer(kTableName));
auto p0 = CreateRangePartition(111, 222);
ASSERT_OK(p0->add_hash_partitions({ kKeyColumn }, 5, 0));
alterer->AddRangePartition(p0.release());
auto p1 = CreateRangePartition(222, 333);
ASSERT_OK(p1->add_hash_partitions({ kKeyColumn }, 8, 0));
alterer->AddRangePartition(p1.release());
ASSERT_OK(alterer->Alter());
}
// Disable the support for the RANGE_SPECIFIC_HASH_SCHEMA.
FLAGS_enable_per_range_hash_schemas = false;
// Dropping ranges with the table-wide or custom hash schemas should be fine
// even if the cluster doesn't support the RANGE_SPECIFIC_HASH_SCHEMA schema
// It's rather a hypothetical situation unless they toggle the flag after
// adding ranges with custom hash schemas or running older Kudu binaries with
// new data.
{
unique_ptr<KuduTableAlterer> alterer(client_->NewTableAlterer(kTableName));
unique_ptr<KuduPartialRow> lower(schema_.NewRow());
ASSERT_OK(lower->SetInt32(kKeyColumn, 111));
unique_ptr<KuduPartialRow> upper(schema_.NewRow());
ASSERT_OK(upper->SetInt32(kKeyColumn, 222));
alterer->DropRangePartition(lower.release(), upper.release());
ASSERT_OK(alterer->Alter());
}
// Dropping tables having ranges with custom hash schema should be fine
// even if the server side has no RANGE_SPECIFIC_HASH_SCHEMA feature.
ASSERT_OK(client_->DeleteTable(kTableName));
}
// Make sure adding and dropping ranges with the table-wide hash schema works
// as expected.
TEST_F(FlexPartitioningAlterTableTest, AddDropTableWideHashSchemaPartitions) {
constexpr const char* const kTableName =
"AddDropTableWideHashSchemaPartitions";
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
table_creator->table_name(kTableName)
.schema(&schema_)
.num_replicas(1)
.add_hash_partitions({ kKeyColumn }, 2)
.set_range_partition_columns({ kKeyColumn });
// Add a range partition with the table-wide hash partitioning rules.
{
unique_ptr<KuduPartialRow> lower(schema_.NewRow());
ASSERT_OK(lower->SetInt32(kKeyColumn, 0));
unique_ptr<KuduPartialRow> upper(schema_.NewRow());
ASSERT_OK(upper->SetInt32(kKeyColumn, 111));
table_creator->add_range_partition(lower.release(), upper.release());
}
ASSERT_OK(table_creator->Create());
ASSERT_OK(InsertTestRows(kTableName, 0, 111));
NO_FATALS(CheckTableRowsNum(kTableName, 111));
// To have mix of ranges, add a single range with custom hash schema.
{
unique_ptr<KuduTableAlterer> alterer(client_->NewTableAlterer(kTableName));
auto p = CreateRangePartition(-111, 0);
ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 5, 1));
alterer->AddRangePartition(p.release());
ASSERT_OK(alterer->Alter());
}
ASSERT_OK(InsertTestRows(kTableName, -111, 0));
NO_FATALS(CheckTableRowsNum(kTableName, 222));
// Add one more range partition with the table-wide hash schema.
{
unique_ptr<KuduTableAlterer> alterer(client_->NewTableAlterer(kTableName));
unique_ptr<KuduPartialRow> lower(schema_.NewRow());
ASSERT_OK(lower->SetInt32(kKeyColumn, 111));
unique_ptr<KuduPartialRow> upper(schema_.NewRow());
ASSERT_OK(upper->SetInt32(kKeyColumn, 222));
alterer->AddRangePartition(lower.release(), upper.release());
ASSERT_OK(alterer->Alter());
}
ASSERT_OK(InsertTestRows(kTableName, 111, 222));
NO_FATALS(CheckTableRowsNum(kTableName, 333));
// Drop the ranges with the table-wide hash partitions.
{
unique_ptr<KuduTableAlterer> alterer(client_->NewTableAlterer(kTableName));
{
unique_ptr<KuduPartialRow> lower(schema_.NewRow());
ASSERT_OK(lower->SetInt32(kKeyColumn, 111));
unique_ptr<KuduPartialRow> upper(schema_.NewRow());
ASSERT_OK(upper->SetInt32(kKeyColumn, 222));
alterer->DropRangePartition(lower.release(), upper.release());
}
{
unique_ptr<KuduPartialRow> lower(schema_.NewRow());
ASSERT_OK(lower->SetInt32(kKeyColumn, 0));
unique_ptr<KuduPartialRow> upper(schema_.NewRow());
ASSERT_OK(upper->SetInt32(kKeyColumn, 111));
alterer->DropRangePartition(lower.release(), upper.release());
}
ASSERT_OK(alterer->Alter());
}
NO_FATALS(CheckTableRowsNum(kTableName, 111));
ASSERT_OK(client_->DeleteTable(kTableName));
}
// Try adding range partition with custom hash schema where hash columns are
// duplicated across different dimensions. That should not be possible and the
// client should receive a proper error in response.
TEST_F(FlexPartitioningAlterTableTest, AddRangeWithDuplicateHashColumns) {
constexpr const char* const kCol0 = "c0";
constexpr const char* const kCol1 = "c1";
constexpr const char* const kCol2 = "c2";
constexpr const char* const kErrMsg =
"hash bucket schema components must not contain columns in common";
constexpr const char* const kTableName = "AddRangeWithDuplicateHashColumns";
KuduSchema schema;
{
KuduSchemaBuilder b;
b.AddColumn(kCol0)->Type(KuduColumnSchema::INT32)->NotNull();
b.AddColumn(kCol1)->Type(KuduColumnSchema::INT32)->NotNull();
b.AddColumn(kCol2)->Type(KuduColumnSchema::STRING)->Nullable();
b.SetPrimaryKey({kCol0, kCol1});
ASSERT_OK(b.Build(&schema));
}
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
table_creator->table_name(kTableName)
.schema(&schema)
.num_replicas(1)
.add_hash_partitions({ kCol0 }, 2)
.add_hash_partitions({ kCol1 }, 3)
.set_range_partition_columns({ kCol0 });
// Add a range partition with the table-wide hash schema.
{
unique_ptr<KuduPartialRow> lower(schema.NewRow());
ASSERT_OK(lower->SetInt32(kCol0, 0));
unique_ptr<KuduPartialRow> upper(schema.NewRow());
ASSERT_OK(upper->SetInt32(kCol0, 111));
table_creator->add_range_partition(lower.release(), upper.release());
}
ASSERT_OK(table_creator->Create());
// Try to add hash partitions with duplicate hash columns across dimensions.
{
unique_ptr<KuduTableAlterer> alterer(client_->NewTableAlterer(kTableName));
auto p = CreateRangePartition(schema, kCol0, -111, 0);
ASSERT_OK(p->add_hash_partitions({ kCol0 }, 3, 1));
ASSERT_OK(p->add_hash_partitions({ kCol0 }, 5, 2));
alterer->AddRangePartition(p.release());
const auto s = alterer->Alter();
ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), kErrMsg);
}
{
unique_ptr<KuduTableAlterer> alterer(client_->NewTableAlterer(kTableName));
auto p = CreateRangePartition(schema, kCol0, -111, 0);
ASSERT_OK(p->add_hash_partitions({ kCol0, kCol1 }, 3, 3));
ASSERT_OK(p->add_hash_partitions({ kCol0 }, 2, 4));
alterer->AddRangePartition(p.release());
const auto s = alterer->Alter();
ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), kErrMsg);
}
{
unique_ptr<KuduTableAlterer> alterer(client_->NewTableAlterer(kTableName));
auto p = CreateRangePartition(schema, kCol0, -111, 0);
ASSERT_OK(p->add_hash_partitions({ kCol1 }, 5, 6));
ASSERT_OK(p->add_hash_partitions({ kCol0, kCol1 }, 7, 5));
alterer->AddRangePartition(p.release());
const auto s = alterer->Alter();
ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), kErrMsg);
}
{
unique_ptr<KuduTableAlterer> alterer(client_->NewTableAlterer(kTableName));
auto p = CreateRangePartition(schema, kCol0, -111, 0);
ASSERT_OK(p->add_hash_partitions({ kCol0, kCol1 }, 3, 7));
ASSERT_OK(p->add_hash_partitions({ kCol1, kCol0 }, 7, 8));
alterer->AddRangePartition(p.release());
const auto s = alterer->Alter();
ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), kErrMsg);
}
}
class FlexPartitioningScanTest : public FlexPartitioningTest {
public:
int num_tablet_servers() const override {
return 3;
}
void SetUp() override {
// This is necessary to make accumulated updates flush to the disk,
// so the DRS-level logic for optimizing the scan can kick in as expected.
FLAGS_flush_threshold_secs = 1;
FlexPartitioningTest::SetUp();
}
};
// This scenario is to reproduce the issue described in KUDU-3384.
TEST_F(FlexPartitioningScanTest, MaxKeyValue) {
static constexpr const char* const kTableName = "max_key_value";
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
table_creator->table_name(kTableName)
.schema(&schema_)
.num_replicas(3)
.set_range_partition_columns({ kKeyColumn });
{
unique_ptr<KuduPartialRow> lower(schema_.NewRow());
unique_ptr<KuduPartialRow> upper(schema_.NewRow());
ASSERT_OK(upper->SetInt32(kKeyColumn, 0));
table_creator->add_range_partition(lower.release(), upper.release());
}
{
unique_ptr<KuduPartialRow> lower(schema_.NewRow());
ASSERT_OK(lower->SetInt32(kKeyColumn, 0));
unique_ptr<KuduPartialRow> upper(schema_.NewRow());
table_creator->add_range_partition(lower.release(), upper.release());
}
ASSERT_OK(table_creator->Create());
shared_ptr<KuduSession> session = client_->NewSession();
ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
shared_ptr<KuduTable> table;
ASSERT_OK(client_->OpenTable(kTableName, &table));
// Not using InsertTestRows() since need to insert a row with key value
// of INT32_MAX.
for (int32_t i = INT32_MIN; i < INT32_MIN + 10; ++i) {
std::unique_ptr<client::KuduInsert> insert(table->NewInsert());
KuduPartialRow* row = insert->mutable_row();
ASSERT_OK(row->SetInt32(0, i));
ASSERT_OK(row->SetInt32(1, i));
ASSERT_OK(session->Apply(insert.release()));
}
for (int32_t i = INT32_MAX; i > INT32_MAX - 10; --i) {
std::unique_ptr<client::KuduInsert> insert(table->NewInsert());
KuduPartialRow* row = insert->mutable_row();
ASSERT_OK(row->SetInt32(0, i));
ASSERT_OK(row->SetInt32(1, i));
ASSERT_OK(session->Apply(insert.release()));
}
ASSERT_OK(session->Flush());
for (auto i = 0; i < 25; ++i) {
SCOPED_TRACE(Substitute("iteration $0", i));
KuduScanTokenBuilder builder(table.get());
ASSERT_OK(builder.SetTimeoutMillis(60000));
vector<KuduScanToken*> tokens;
ElementDeleter DeleteTable(&tokens);
ASSERT_OK(builder.Build(&tokens));
vector<string> rows;
for (auto token : tokens) {
KuduScanner* scanner_ptr;
ASSERT_OK(token->IntoKuduScanner(&scanner_ptr));
ASSERT_OK(scanner_ptr->SetReadMode(KuduScanner::ReadMode::READ_AT_SNAPSHOT));
unique_ptr<KuduScanner> scanner(scanner_ptr);
ASSERT_OK(ScanToStrings(scanner.get(), &rows));
}
}
}
// Try adding range partition with custom hash schema where the number of
// hash buckets is invalid.
TEST_F(FlexPartitioningAlterTableTest, AddRangeWithWrongHashBucketsNumber) {
constexpr const char* const kCol0 = "c0";
constexpr const char* const kCol1 = "c1";
constexpr const char* const kErrMsg =
"at least two buckets are required to establish hash partitioning";
constexpr const char* const kTableName = "AddRangeWithWrongHashBucketsNumber";
KuduSchema schema;
{
KuduSchemaBuilder b;
b.AddColumn(kCol0)->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey();
b.AddColumn(kCol1)->Type(KuduColumnSchema::STRING)->Nullable();
ASSERT_OK(b.Build(&schema));
}
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
table_creator->table_name(kTableName)
.schema(&schema)
.num_replicas(1)
.add_hash_partitions({ kCol0 }, 2)
.set_range_partition_columns({ kCol0 });
// Add a range partition with the table-wide hash schema.
{
unique_ptr<KuduPartialRow> lower(schema.NewRow());
ASSERT_OK(lower->SetInt32(kCol0, -100));
unique_ptr<KuduPartialRow> upper(schema.NewRow());
ASSERT_OK(upper->SetInt32(kCol0, 0));
table_creator->add_range_partition(lower.release(), upper.release());
}
ASSERT_OK(table_creator->Create());
// Try to add hash partitions with wrong number of buckets in the
// range-specific hash schema. In Kudu C++ client, such mistakes are caught
// at the client side.
for (auto hash_bucket_num = -1; hash_bucket_num < 2; ++hash_bucket_num) {
SCOPED_TRACE(Substitute("hash schema with $0 buckets", hash_bucket_num));
unique_ptr<KuduTableAlterer> alterer(client_->NewTableAlterer(kTableName));
auto p = CreateRangePartition(schema, kCol0, 0, 100);
const auto s = p->add_hash_partitions({ kCol0 }, hash_bucket_num, 0);
ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), kErrMsg);
}
}
} // namespace client
} // namespace kudu