blob: 5c2fea999fe09d19b3458e51b29e1f23ecf08b66 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// Integration test for flexible partitioning (eg buckets, range partitioning
// of PK subsets, etc).
#include <algorithm>
#include <glog/stl_logging.h>
#include <map>
#include <memory>
#include <vector>
#include "kudu/client/client-test-util.h"
#include "kudu/common/partial_row.h"
#include "kudu/integration-tests/external_mini_cluster.h"
#include "kudu/integration-tests/cluster_itest_util.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/tools/data_gen_util.h"
#include "kudu/util/random.h"
#include "kudu/util/random_util.h"
#include "kudu/util/test_util.h"
#include "kudu/gutil/strings/escaping.h"
namespace kudu {
namespace itest {
using client::KuduClient;
using client::KuduClientBuilder;
using client::KuduColumnSchema;
using client::KuduInsert;
using client::KuduPredicate;
using client::KuduScanner;
using client::KuduSchema;
using client::KuduSchemaBuilder;
using client::KuduSession;
using client::KuduTable;
using client::KuduTableCreator;
using client::KuduValue;
using client::sp::shared_ptr;
using std::unordered_map;
using std::vector;
using strings::Substitute;
static const char* const kTableName = "test-table";
static const int kNumRows = 1000;
class FlexPartitioningITest : public KuduTest {
public:
FlexPartitioningITest()
: random_(GetRandomSeed32()) {
}
virtual void SetUp() OVERRIDE {
KuduTest::SetUp();
ExternalMiniClusterOptions opts;
opts.num_tablet_servers = 1;
// This test produces lots of tablets. With container and log preallocation,
// we end up using quite a bit of disk space. So, we disable them.
opts.extra_tserver_flags.push_back("--log_container_preallocate_bytes=0");
opts.extra_tserver_flags.push_back("--log_preallocate_segments=false");
cluster_.reset(new ExternalMiniCluster(opts));
ASSERT_OK(cluster_->Start());
KuduClientBuilder builder;
ASSERT_OK(cluster_->CreateClient(builder, &client_));
ASSERT_OK(itest::CreateTabletServerMap(cluster_->master_proxy().get(),
cluster_->messenger(),
&ts_map_));
}
virtual void TearDown() OVERRIDE {
cluster_->Shutdown();
KuduTest::TearDown();
STLDeleteValues(&ts_map_);
STLDeleteElements(&inserted_rows_);
}
protected:
void CreateTable(int num_columns,
const vector<string>& bucket_a, int num_buckets_a,
const vector<string>& bucket_b, int num_buckets_b,
const vector<string>& range_cols,
int num_splits) {
// Set up the actual PK columns based on num_columns. The PK is made up
// of all the columns.
KuduSchemaBuilder b;
vector<string> pk;
for (int i = 0; i < num_columns; i++) {
string name = Substitute("c$0", i);
b.AddColumn(name)->Type(KuduColumnSchema::INT32)->NotNull();
pk.push_back(name);
}
b.SetPrimaryKey(pk);
KuduSchema schema;
ASSERT_OK(b.Build(&schema));
gscoped_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
table_creator->table_name(kTableName)
.schema(&schema)
.num_replicas(1);
// Set up partitioning.
if (!bucket_a.empty()) {
table_creator->add_hash_partitions(bucket_a, num_buckets_a);
}
if (!bucket_b.empty()) {
table_creator->add_hash_partitions(bucket_b, num_buckets_b);
}
table_creator->set_range_partition_columns(range_cols);
// Compute split points.
vector<const KuduPartialRow*> split_rows;
int increment = kNumRows / num_splits;
for (int i = 1; i < num_splits; i++) {
KuduPartialRow* row = schema.NewRow();
for (int j = 0; j < range_cols.size(); j++) {
const string& range_col = range_cols[j];
if (j == 0) {
// Set the first component of the range to a set increment.
ASSERT_OK(row->SetInt32(range_col, increment * i));
} else {
ASSERT_OK(row->SetInt32(range_col, random_.Next32()));
}
}
split_rows.push_back(row);
}
table_creator->split_rows(split_rows);
ASSERT_OK(table_creator->Create());
ASSERT_OK(client_->OpenTable(kTableName, &table_));
}
int CountTablets() {
vector<tserver::ListTabletsResponsePB_StatusAndSchemaPB> tablets;
CHECK_OK(ListTablets(ts_map_.begin()->second, MonoDelta::FromSeconds(10), &tablets));
return tablets.size();
}
// Insert 'kNumRows' rows into the given table. The first column 'c0' is ascending,
// but the rest are random int32s.
Status InsertRandomRows();
// Perform a scan with a predicate on 'col_name' BETWEEN 'lower' AND 'upper'.
// Verifies that the results match up with applying the same scan against our
// in-memory copy 'inserted_rows_'.
void CheckScanWithColumnPredicate(Slice col_name, int lower, int upper);
// Like the above, but uses the primary key range scan API in the client to
// scan between 'inserted_rows_[lower]' (inclusive) and 'inserted_rows_[upper]'
// (exclusive).
void CheckPKRangeScan(int lower, int upper);
void CheckPartitionKeyRangeScanWithPKRange(int lower, int upper);
// Performs a series of scans, each over a single tablet in the table, and
// verifies that the aggregated results match up with 'inserted_rows_'.
void CheckPartitionKeyRangeScan();
// Inserts data into the table, then performs a number of scans to verify that
// the data can be retrieved.
void InsertAndVerifyScans();
Random random_;
gscoped_ptr<ExternalMiniCluster> cluster_;
unordered_map<string, TServerDetails*> ts_map_;
shared_ptr<KuduClient> client_;
shared_ptr<KuduTable> table_;
vector<KuduPartialRow*> inserted_rows_;
};
Status FlexPartitioningITest::InsertRandomRows() {
CHECK(inserted_rows_.empty());
shared_ptr<KuduSession> session(client_->NewSession());
session->SetTimeoutMillis(10000);
RETURN_NOT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
for (uint64_t i = 0; i < kNumRows; i++) {
gscoped_ptr<KuduInsert> insert(table_->NewInsert());
tools::GenerateDataForRow(table_->schema(), i, &random_, insert->mutable_row());
inserted_rows_.push_back(new KuduPartialRow(*insert->mutable_row()));
RETURN_NOT_OK(session->Apply(insert.release()));
if (i > 0 && i % 1000 == 0) {
RETURN_NOT_OK(session->Flush());
}
}
RETURN_NOT_OK(session->Flush());
return Status::OK();
}
void FlexPartitioningITest::CheckScanWithColumnPredicate(Slice col_name, int lower, int upper) {
KuduScanner scanner(table_.get());
scanner.SetTimeoutMillis(60000);
CHECK_OK(scanner.AddConjunctPredicate(table_->NewComparisonPredicate(
col_name, KuduPredicate::GREATER_EQUAL, KuduValue::FromInt(lower))));
CHECK_OK(scanner.AddConjunctPredicate(table_->NewComparisonPredicate(
col_name, KuduPredicate::LESS_EQUAL, KuduValue::FromInt(upper))));
vector<string> rows;
ScanToStrings(&scanner, &rows);
std::sort(rows.begin(), rows.end());
// Manually evaluate the predicate against the data we think we inserted.
vector<string> expected_rows;
for (const KuduPartialRow* row : inserted_rows_) {
int32_t val;
CHECK_OK(row->GetInt32(col_name, &val));
if (val >= lower && val <= upper) {
expected_rows.push_back("(" + row->ToString() + ")");
}
}
std::sort(expected_rows.begin(), expected_rows.end());
ASSERT_EQ(expected_rows.size(), rows.size());
ASSERT_EQ(expected_rows, rows);
}
void FlexPartitioningITest::CheckPKRangeScan(int lower, int upper) {
KuduScanner scanner(table_.get());
scanner.SetTimeoutMillis(60000);
ASSERT_OK(scanner.AddLowerBound(*inserted_rows_[lower]));
ASSERT_OK(scanner.AddExclusiveUpperBound(*inserted_rows_[upper]));
vector<string> rows;
ScanToStrings(&scanner, &rows);
std::sort(rows.begin(), rows.end());
vector<string> expected_rows;
for (int i = lower; i < upper; i++) {
expected_rows.push_back("(" + inserted_rows_[i]->ToString() + ")");
}
std::sort(expected_rows.begin(), expected_rows.end());
ASSERT_EQ(rows.size(), expected_rows.size());
ASSERT_EQ(rows, expected_rows);
}
void FlexPartitioningITest::CheckPartitionKeyRangeScan() {
master::GetTableLocationsResponsePB table_locations;
ASSERT_OK(GetTableLocations(cluster_->master_proxy(),
table_->name(),
MonoDelta::FromSeconds(32),
&table_locations));
vector<string> rows;
for (const master::TabletLocationsPB& tablet_locations :
table_locations.tablet_locations()) {
string partition_key_start = tablet_locations.partition().partition_key_start();
string partition_key_end = tablet_locations.partition().partition_key_end();
KuduScanner scanner(table_.get());
scanner.SetTimeoutMillis(60000);
ASSERT_OK(scanner.AddLowerBoundPartitionKeyRaw(partition_key_start));
ASSERT_OK(scanner.AddExclusiveUpperBoundPartitionKeyRaw(partition_key_end));
ScanToStrings(&scanner, &rows);
}
std::sort(rows.begin(), rows.end());
vector<string> expected_rows;
for (KuduPartialRow* row : inserted_rows_) {
expected_rows.push_back("(" + row->ToString() + ")");
}
std::sort(expected_rows.begin(), expected_rows.end());
ASSERT_EQ(rows.size(), expected_rows.size());
ASSERT_EQ(rows, expected_rows);
}
void FlexPartitioningITest::CheckPartitionKeyRangeScanWithPKRange(int lower, int upper) {
master::GetTableLocationsResponsePB table_locations;
ASSERT_OK(GetTableLocations(cluster_->master_proxy(),
table_->name(),
MonoDelta::FromSeconds(32),
&table_locations));
vector<string> rows;
for (const master::TabletLocationsPB& tablet_locations :
table_locations.tablet_locations()) {
string partition_key_start = tablet_locations.partition().partition_key_start();
string partition_key_end = tablet_locations.partition().partition_key_end();
KuduScanner scanner(table_.get());
scanner.SetTimeoutMillis(60000);
ASSERT_OK(scanner.AddLowerBoundPartitionKeyRaw(partition_key_start));
ASSERT_OK(scanner.AddExclusiveUpperBoundPartitionKeyRaw(partition_key_end));
ASSERT_OK(scanner.AddLowerBound(*inserted_rows_[lower]));
ASSERT_OK(scanner.AddExclusiveUpperBound(*inserted_rows_[upper]));
ScanToStrings(&scanner, &rows);
}
std::sort(rows.begin(), rows.end());
vector<string> expected_rows;
for (int i = lower; i < upper; i++) {
expected_rows.push_back("(" + inserted_rows_[i]->ToString() + ")");
}
std::sort(expected_rows.begin(), expected_rows.end());
ASSERT_EQ(rows.size(), expected_rows.size());
ASSERT_EQ(rows, expected_rows);
}
void FlexPartitioningITest::InsertAndVerifyScans() {
ASSERT_OK(InsertRandomRows());
// First, ensure that we get back the same number we put in.
{
vector<string> rows;
ScanTableToStrings(table_.get(), &rows);
std::sort(rows.begin(), rows.end());
ASSERT_EQ(kNumRows, rows.size());
}
// Perform some scans with predicates.
// 1) Various predicates on 'c0', which has non-random data.
// We concentrate around the value '500' since there is a split point
// there.
NO_FATALS(CheckScanWithColumnPredicate("c0", 100, 120));
NO_FATALS(CheckScanWithColumnPredicate("c0", 490, 610));
NO_FATALS(CheckScanWithColumnPredicate("c0", 499, 499));
NO_FATALS(CheckScanWithColumnPredicate("c0", 500, 500));
NO_FATALS(CheckScanWithColumnPredicate("c0", 501, 501));
NO_FATALS(CheckScanWithColumnPredicate("c0", 499, 501));
NO_FATALS(CheckScanWithColumnPredicate("c0", 499, 500));
NO_FATALS(CheckScanWithColumnPredicate("c0", 500, 501));
// 2) Random range predicates on the other columns, which are random ints.
for (int col_idx = 1; col_idx < table_->schema().num_columns(); col_idx++) {
SCOPED_TRACE(col_idx);
for (int i = 0; i < 10; i++) {
int32_t lower = random_.Next32();
int32_t upper = random_.Next32();
if (upper < lower) {
std::swap(lower, upper);
}
NO_FATALS(CheckScanWithColumnPredicate(table_->schema().Column(col_idx).name(),
lower, upper));
}
}
// 3) Use the "primary key range" API.
{
NO_FATALS(CheckPKRangeScan(100, 120));
NO_FATALS(CheckPKRangeScan(490, 610));
NO_FATALS(CheckPKRangeScan(499, 499));
NO_FATALS(CheckPKRangeScan(500, 500));
NO_FATALS(CheckPKRangeScan(501, 501));
NO_FATALS(CheckPKRangeScan(499, 501));
NO_FATALS(CheckPKRangeScan(499, 500));
NO_FATALS(CheckPKRangeScan(500, 501));
}
// 4) Use the Per-tablet "partition key range" API.
{
NO_FATALS(CheckPartitionKeyRangeScan());
}
// 5) Use the Per-tablet "partition key range" API with primary key range.
{
NO_FATALS(CheckPartitionKeyRangeScanWithPKRange(100, 120));
NO_FATALS(CheckPartitionKeyRangeScanWithPKRange(200, 400));
NO_FATALS(CheckPartitionKeyRangeScanWithPKRange(490, 610));
NO_FATALS(CheckPartitionKeyRangeScanWithPKRange(499, 499));
NO_FATALS(CheckPartitionKeyRangeScanWithPKRange(500, 500));
NO_FATALS(CheckPartitionKeyRangeScanWithPKRange(501, 501));
NO_FATALS(CheckPartitionKeyRangeScanWithPKRange(499, 501));
NO_FATALS(CheckPartitionKeyRangeScanWithPKRange(499, 500));
NO_FATALS(CheckPartitionKeyRangeScanWithPKRange(500, 501));
NO_FATALS(CheckPartitionKeyRangeScanWithPKRange(650, 700));
NO_FATALS(CheckPartitionKeyRangeScanWithPKRange(700, 800));
}
}
// CREATE TABLE t (
// c0 INT32,
// c1 INT32,
// PRIMARY KEY (c0, c1)
// RANGE PARTITION BY (c0, c1),
// );
TEST_F(FlexPartitioningITest, TestSimplePartitioning) {
NO_FATALS(CreateTable(1, // 2 columns
vector<string>(), 0, // No hash buckets
vector<string>(), 0, // No hash buckets
{ "c0" }, // no range partitioning
2)); // 1 split;
ASSERT_EQ(2, CountTablets());
InsertAndVerifyScans();
}
// CREATE TABLE t (
// c0 INT32 PRIMARY KEY,
// BUCKET BY (c0) INTO 3 BUCKETS
// );
TEST_F(FlexPartitioningITest, TestSinglePKBucketed) {
NO_FATALS(CreateTable(1, // 1 column
{ "c0" }, 3, // bucket by "c0" in 3 buckets
vector<string>(), 0, // no other buckets
{ "c0" }, // default range
2)); // one split
ASSERT_EQ(6, CountTablets());
InsertAndVerifyScans();
}
// CREATE TABLE t (
// c0 INT32,
// c1 INT32,
// PRIMARY KEY (c0, c1)
// BUCKET BY (c1) INTO 3 BUCKETS
// );
TEST_F(FlexPartitioningITest, TestCompositePK_BucketOnSecondColumn) {
NO_FATALS(CreateTable(2, // 2 columns
{ "c1" }, 3, // bucket by "c0" in 3 buckets
vector<string>(), 0, // no other buckets
{ "c0", "c1" }, // default range
1)); // no splits;
ASSERT_EQ(3, CountTablets());
InsertAndVerifyScans();
}
// CREATE TABLE t (
// c0 INT32,
// c1 INT32,
// PRIMARY KEY (c0, c1)
// RANGE PARTITION BY (c1, c0)
// );
TEST_F(FlexPartitioningITest, TestCompositePK_RangePartitionByReversedPK) {
NO_FATALS(CreateTable(2, // 2 columns
vector<string>(), 0, // no buckets
vector<string>(), 0, // no buckets
{ "c1", "c0" }, // range partition by reversed PK
2)); // one split
ASSERT_EQ(2, CountTablets());
InsertAndVerifyScans();
}
// CREATE TABLE t (
// c0 INT32,
// c1 INT32,
// PRIMARY KEY (c0, c1)
// RANGE PARTITION BY (c0)
// );
TEST_F(FlexPartitioningITest, TestCompositePK_RangePartitionByPKPrefix) {
NO_FATALS(CreateTable(2, // 2 columns
vector<string>(), 0, // no buckets
vector<string>(), 0, // no buckets
{ "c0" }, // range partition by c0
2)); // one split
ASSERT_EQ(2, CountTablets());
InsertAndVerifyScans();
}
// CREATE TABLE t (
// c0 INT32,
// c1 INT32,
// PRIMARY KEY (c0, c1)
// RANGE PARTITION BY (c1)
// );
TEST_F(FlexPartitioningITest, TestCompositePK_RangePartitionByPKSuffix) {
NO_FATALS(CreateTable(2, // 2 columns
vector<string>(), 0, // no buckets
vector<string>(), 0, // no buckets
{ "c1" }, // range partition by c1
2)); // one split
ASSERT_EQ(2, CountTablets());
InsertAndVerifyScans();
}
// CREATE TABLE t (
// c0 INT32,
// c1 INT32,
// PRIMARY KEY (c0, c1)
// RANGE PARTITION BY (c0),
// BUCKET BY (c1) INTO 4 BUCKETS
// );
TEST_F(FlexPartitioningITest, TestCompositePK_RangeAndBucket) {
NO_FATALS(CreateTable(2, // 2 columns
{ "c1" }, 4, // BUCKET BY c1 INTO 4 BUCKETS
vector<string>(), 0, // no buckets
{ "c0" }, // range partition by c0
2)); // 1 split;
ASSERT_EQ(8, CountTablets());
InsertAndVerifyScans();
}
// CREATE TABLE t (
// c0 INT32,
// c1 INT32,
// PRIMARY KEY (c0, c1)
// BUCKET BY (c1) INTO 4 BUCKETS,
// BUCKET BY (c0) INTO 3 BUCKETS
// );
TEST_F(FlexPartitioningITest, TestCompositePK_MultipleBucketings) {
NO_FATALS(CreateTable(2, // 2 columns
{ "c1" }, 4, // BUCKET BY c1 INTO 4 BUCKETS
{ "c0" }, 3, // BUCKET BY c0 INTO 3 BUCKETS
{ "c0", "c1" }, // default range partitioning
2)); // 1 split;
ASSERT_EQ(4 * 3 * 2, CountTablets());
InsertAndVerifyScans();
}
// CREATE TABLE t (
// c0 INT32,
// c1 INT32,
// PRIMARY KEY (c0, c1)
// RANGE PARTITION BY (),
// BUCKET BY (c0) INTO 4 BUCKETS,
// );
TEST_F(FlexPartitioningITest, TestCompositePK_SingleBucketNoRange) {
NO_FATALS(CreateTable(2, // 2 columns
{ "c0" }, 4, // BUCKET BY c0 INTO 4 BUCKETS
vector<string>(), 0, // no buckets
vector<string>(), // no range partitioning
1)); // 0 splits;
ASSERT_EQ(4, CountTablets());
InsertAndVerifyScans();
}
// CREATE TABLE t (
// c0 INT32,
// c1 INT32,
// PRIMARY KEY (c0, c1)
// RANGE PARTITION BY (),
// BUCKET BY (c0) INTO 4 BUCKETS,
// BUCKET BY (c1) INTO 5 BUCKETS,
// );
TEST_F(FlexPartitioningITest, TestCompositePK_MultipleBucketingsNoRange) {
NO_FATALS(CreateTable(2, // 2 columns
{ "c0" }, 4, // BUCKET BY c0 INTO 4 BUCKETS
{ "c1" }, 5, // BUCKET BY c1 INTO 5 BUCKETS
vector<string>(), // no range partitioning
1)); // 0 splits;
ASSERT_EQ(20, CountTablets());
InsertAndVerifyScans();
}
} // namespace itest
} // namespace kudu