blob: 45c2f3495f948065ca6da2f89f4f697fdad6d83c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/common/partition_pruner.h"
#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include <boost/optional/optional.hpp>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "kudu/common/column_predicate.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/encoded_key.h"
#include "kudu/common/partial_row.h"
#include "kudu/common/partition.h"
#include "kudu/common/row.h"
#include "kudu/common/scan_spec.h"
#include "kudu/common/schema.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/memory/arena.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
using boost::optional;
using std::count_if;
using std::get;
using std::make_tuple;
using std::move;
using std::string;
using std::tuple;
using std::unique_ptr;
using std::vector;
namespace kudu {
class PartitionPrunerTest : public KuduTest {
};
void CheckPrunedPartitions(const Schema& schema,
const PartitionSchema& partition_schema,
const vector<Partition>& partitions,
const ScanSpec& spec,
size_t remaining_tablets,
size_t pruner_ranges) {
ScanSpec opt_spec(spec);
Arena arena(256);
opt_spec.OptimizeScan(schema, &arena, false);
PartitionPruner pruner;
pruner.Init(schema, partition_schema, opt_spec);
SCOPED_TRACE(strings::Substitute("schema: $0", schema.ToString()));
SCOPED_TRACE(strings::Substitute("partition schema: $0", partition_schema.DebugString(schema)));
SCOPED_TRACE(strings::Substitute("partition pruner: $0",
pruner.ToString(schema, partition_schema)));
SCOPED_TRACE(strings::Substitute("optimized scan spec: $0", opt_spec.ToString(schema)));
SCOPED_TRACE(strings::Substitute("original scan spec: $0", spec.ToString(schema)));
int pruned_partitions = count_if(partitions.begin(), partitions.end(),
[&] (const Partition& partition) {
return pruner.ShouldPrune(partition);
});
ASSERT_EQ(remaining_tablets, partitions.size() - pruned_partitions);
ASSERT_EQ(pruner_ranges, pruner.NumRangesRemainingForTests());
}
TEST_F(PartitionPrunerTest, TestPrimaryKeyRangePruning) {
// CREATE TABLE t
// (a INT8, b INT8, c INT8)
// PRIMARY KEY (a, b, c)) SPLIT ROWS [(0, 0, 0), (10, 10, 10)]
// DISTRIBUTE BY RANGE(a, b, c);
Schema schema({ ColumnSchema("a", INT8),
ColumnSchema("b", INT8),
ColumnSchema("c", INT8) },
{ ColumnId(0), ColumnId(1), ColumnId(2) },
3);
Arena arena(1024);
PartitionSchema partition_schema;
ASSERT_OK(PartitionSchema::FromPB(PartitionSchemaPB(), schema, &partition_schema));
KuduPartialRow split1(&schema);
ASSERT_OK(split1.SetInt8("a", 0));
ASSERT_OK(split1.SetInt8("b", 0));
ASSERT_OK(split1.SetInt8("c", 0));
KuduPartialRow split2(&schema);
ASSERT_OK(split2.SetInt8("a", 10));
ASSERT_OK(split2.SetInt8("b", 10));
ASSERT_OK(split2.SetInt8("c", 10));
vector<Partition> partitions;
ASSERT_OK(partition_schema.CreatePartitions({ split1, split2 }, {}, {}, schema, &partitions));
// Creates a scan with optional lower and upper bounds, and checks that the
// expected number of tablets are pruned.
auto Check = [&] (optional<tuple<int8_t, int8_t, int8_t>> lower,
optional<tuple<int8_t, int8_t, int8_t>> upper,
size_t remaining_tablets) {
ScanSpec spec;
KuduPartialRow lower_bound(&schema);
KuduPartialRow upper_bound(&schema);
EncodedKey* enc_lower_bound = nullptr;
EncodedKey* enc_upper_bound = nullptr;
if (lower) {
CHECK_OK(lower_bound.SetInt8("a", get<0>(*lower)));
CHECK_OK(lower_bound.SetInt8("b", get<1>(*lower)));
CHECK_OK(lower_bound.SetInt8("c", get<2>(*lower)));
ConstContiguousRow row(lower_bound.schema(), lower_bound.row_data_);
enc_lower_bound = EncodedKey::FromContiguousRow(row, &arena);
spec.SetLowerBoundKey(enc_lower_bound);
}
if (upper) {
CHECK_OK(upper_bound.SetInt8("a", get<0>(*upper)));
CHECK_OK(upper_bound.SetInt8("b", get<1>(*upper)));
CHECK_OK(upper_bound.SetInt8("c", get<2>(*upper)));
ConstContiguousRow row(upper_bound.schema(), upper_bound.row_data_);
enc_upper_bound = EncodedKey::FromContiguousRow(row, &arena);
spec.SetExclusiveUpperBoundKey(enc_upper_bound);
}
size_t pruner_ranges = remaining_tablets == 0 ? 0 : 1;
CheckPrunedPartitions(schema, partition_schema, partitions, spec,
remaining_tablets, pruner_ranges);
};
// No bounds
Check(boost::none, boost::none, 3);
// PK < (-1, min, min)
Check(boost::none,
make_tuple<int8_t, int8_t, int8_t>(-1, INT8_MIN, INT8_MIN),
1);
// PK < (10, 10, 10)
Check(boost::none,
make_tuple<int8_t, int8_t, int8_t>(10, 10, 10),
2);
// PK < (100, min, min)
Check(boost::none,
make_tuple<int8_t, int8_t, int8_t>(100, INT8_MIN, INT8_MIN),
3);
// PK >= (-10, -10, -10)
Check(make_tuple<int8_t, int8_t, int8_t>(-10, -10, -10),
boost::none,
3);
// PK >= (0, 0, 0)
Check(make_tuple<int8_t, int8_t, int8_t>(0, 0, 0),
boost::none,
2);
// PK >= (100, 0, 0)
Check(make_tuple<int8_t, int8_t, int8_t>(100, 0, 0),
boost::none,
1);
// PK >= (-10, 0, 0)
// PK < (100, 0, 0)
Check(make_tuple<int8_t, int8_t, int8_t>(-10, 0, 0),
make_tuple<int8_t, int8_t, int8_t>(100, 0, 0),
3);
// PK >= (0, 0, 0)
// PK < (10, 10, 10)
Check(make_tuple<int8_t, int8_t, int8_t>(0, 0, 0),
make_tuple<int8_t, int8_t, int8_t>(10, 10, 10),
1);
// PK >= (0, 0, 0)
// PK < (10, 10, 11)
Check(make_tuple<int8_t, int8_t, int8_t>(0, 0, 0),
make_tuple<int8_t, int8_t, int8_t>(10, 10, 11),
2);
// PK < (0, 0, 0)
// PK >= (10, 10, 11)
Check(make_tuple<int8_t, int8_t, int8_t>(10, 10, 11),
make_tuple<int8_t, int8_t, int8_t>(0, 0, 0),
0);
}
TEST_F(PartitionPrunerTest, TestPartialPrimaryKeyRangePruning) {
// CREATE TABLE t
// (a INT8, b STRING, c STRING, PRIMARY KEY (a, b, c))
// DISTRIBUTE BY RANGE(a, b)
// SPLIT ROWS [(0, "m"), (10, "r"];
// Setup the Schema
Schema schema({ ColumnSchema("a", INT8),
ColumnSchema("b", STRING),
ColumnSchema("c", STRING) },
{ ColumnId(0), ColumnId(1), ColumnId(2) },
3);
Arena arena(1024);
PartitionSchema partition_schema;
auto pb = PartitionSchemaPB();
auto range_schema = pb.mutable_range_schema();
range_schema->add_columns()->set_name("a");
range_schema->add_columns()->set_name("b");
ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
KuduPartialRow split1(&schema);
ASSERT_OK(split1.SetInt8("a", 0));
ASSERT_OK(split1.SetStringCopy("b", "m"));
KuduPartialRow split2(&schema);
ASSERT_OK(split2.SetInt8("a", 10));
ASSERT_OK(split2.SetStringCopy("b", "r"));
vector<Partition> partitions;
ASSERT_OK(partition_schema.CreatePartitions({ split1, split2 }, {}, {}, schema, &partitions));
// Applies the specified lower and upper bound primary keys against the
// schema, and checks that the expected number of partitions are pruned.
auto Check = [&] (optional<tuple<int8_t, string, string>> lower,
optional<tuple<int8_t, string, string>> upper,
size_t remaining_tablets ) {
ScanSpec spec;
KuduPartialRow lower_bound(&schema);
KuduPartialRow upper_bound(&schema);
EncodedKey* enc_lower_bound = nullptr;
EncodedKey* enc_upper_bound = nullptr;
if (lower) {
CHECK_OK(lower_bound.SetInt8("a", get<0>(*lower)));
CHECK_OK(lower_bound.SetStringCopy("b", get<1>(*lower)));
CHECK_OK(lower_bound.SetStringCopy("c", get<2>(*lower)));
ConstContiguousRow row(lower_bound.schema(), lower_bound.row_data_);
enc_lower_bound = EncodedKey::FromContiguousRow(row, &arena);
spec.SetLowerBoundKey(enc_lower_bound);
}
if (upper) {
CHECK_OK(upper_bound.SetInt8("a", get<0>(*upper)));
CHECK_OK(upper_bound.SetStringCopy("b", get<1>(*upper)));
CHECK_OK(upper_bound.SetStringCopy("c", get<2>(*upper)));
ConstContiguousRow row(upper_bound.schema(), upper_bound.row_data_);
enc_upper_bound = EncodedKey::FromContiguousRow(row, &arena);
spec.SetExclusiveUpperBoundKey(enc_upper_bound);
}
size_t pruner_ranges = remaining_tablets == 0 ? 0 : 1;
CheckPrunedPartitions(schema, partition_schema, partitions, spec,
remaining_tablets, pruner_ranges);
};
// No bounds
Check(boost::none, boost::none, 3);
// PK < (-1, min, "")
Check(boost::none, make_tuple<int8_t, string, string>(-1, "", ""), 1);
// PK < (10, "r", "")
Check(boost::none, make_tuple<int8_t, string, string>(10, "r", ""), 2);
// PK < (10, "r", "z")
Check(boost::none, make_tuple<int8_t, string, string>(10, "r", "z"), 3);
// PK < (100, min, "")
Check(boost::none, make_tuple<int8_t, string, string>(100, "", ""), 3);
// PK >= (-10, "m", "")
Check(make_tuple<int8_t, string, string>(-10, "m", ""), boost::none, 3);
// PK >= (0, "", "")
Check(make_tuple<int8_t, string, string>(0, "", ""), boost::none, 3);
// PK >= (0, "m", "")
Check(make_tuple<int8_t, string, string>(0, "m", ""), boost::none, 2);
// PK >= (100, "", "")
Check(make_tuple<int8_t, string, string>(100, "", ""), boost::none, 1);
// PK >= (-10, 0, "")
// PK < (100, 0, "")
Check(make_tuple<int8_t, string, string>(-10, "", ""),
make_tuple<int8_t, string, string>(100, "", ""), 3);
// PK >= (0, "m", "")
// PK < (10, "r", "")
Check(make_tuple<int8_t, string, string>(0, "m", ""),
make_tuple<int8_t, string, string>(10, "r", ""), 1);
// PK >= (0, "m", "")
// PK < (10, "r", "z")
Check(make_tuple<int8_t, string, string>(0, "m", ""),
make_tuple<int8_t, string, string>(10, "r", "z"), 2);
// PK >= (0, "", "")
// PK < (10, "m", "z")
Check(make_tuple<int8_t, string, string>(0, "", ""),
make_tuple<int8_t, string, string>(10, "m", "z"), 2);
// PK >= (10, "m", "")
// PK < (10, "m", "z")
Check(make_tuple<int8_t, string, string>(10, "m", ""),
make_tuple<int8_t, string, string>(10, "m", "z"), 1);
}
TEST_F(PartitionPrunerTest, TestIntPartialPrimaryKeyRangePruning) {
// CREATE TABLE t
// (a INT8, b INT8, c INT8, PRIMARY KEY (a, b, c))
// DISTRIBUTE BY RANGE(a, b)
// SPLIT ROWS [(0, 0)];
// Setup the Schema
Schema schema({ ColumnSchema("a", INT8),
ColumnSchema("b", INT8),
ColumnSchema("c", INT8) },
{ ColumnId(0), ColumnId(1), ColumnId(2) },
3);
Arena arena(1024);
PartitionSchema partition_schema;
auto pb = PartitionSchemaPB();
auto* range_schema = pb.mutable_range_schema();
range_schema->add_columns()->set_name("a");
range_schema->add_columns()->set_name("b");
ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
KuduPartialRow split(&schema);
ASSERT_OK(split.SetInt8("a", 0));
ASSERT_OK(split.SetInt8("b", 0));
vector<Partition> partitions;
ASSERT_OK(partition_schema.CreatePartitions({ split }, {}, {}, schema, &partitions));
// Applies the specified lower and upper bound primary keys against the
// schema, and checks that the expected number of partitions are pruned.
auto Check = [&] (optional<tuple<int8_t, int8_t, int8_t>> lower,
optional<tuple<int8_t, int8_t, int8_t>> upper,
size_t remaining_tablets ) {
ScanSpec spec;
KuduPartialRow lower_bound(&schema);
KuduPartialRow upper_bound(&schema);
EncodedKey* enc_lower_bound = nullptr;
EncodedKey* enc_upper_bound = nullptr;
if (lower) {
CHECK_OK(lower_bound.SetInt8("a", get<0>(*lower)));
CHECK_OK(lower_bound.SetInt8("b", get<1>(*lower)));
CHECK_OK(lower_bound.SetInt8("c", get<2>(*lower)));
ConstContiguousRow row(lower_bound.schema(), lower_bound.row_data_);
enc_lower_bound = EncodedKey::FromContiguousRow(row, &arena);
spec.SetLowerBoundKey(enc_lower_bound);
}
if (upper) {
CHECK_OK(upper_bound.SetInt8("a", get<0>(*upper)));
CHECK_OK(upper_bound.SetInt8("b", get<1>(*upper)));
CHECK_OK(upper_bound.SetInt8("c", get<2>(*upper)));
ConstContiguousRow row(upper_bound.schema(), upper_bound.row_data_);
enc_upper_bound = EncodedKey::FromContiguousRow(row, &arena);
spec.SetExclusiveUpperBoundKey(enc_upper_bound);
}
size_t pruner_ranges = remaining_tablets == 0 ? 0 : 1;
CheckPrunedPartitions(schema, partition_schema, partitions, spec,
remaining_tablets, pruner_ranges);
};
// No bounds
Check(boost::none, boost::none, 2);
// PK < (0, 0, min)
Check(boost::none, make_tuple<int8_t, int8_t, int8_t>(0, INT8_MIN, INT8_MIN), 1);
// PK < (0, 0, 0);
Check(boost::none, make_tuple<int8_t, int8_t, int8_t>(0, 0, 0), 2);
// PK < (0, max, 0);
Check(boost::none, make_tuple<int8_t, int8_t, int8_t>(INT8_MAX, INT8_MAX, 0), 2);
// PK < (max, max, min);
Check(boost::none, make_tuple<int8_t, int8_t, int8_t>(INT8_MAX, INT8_MAX, INT8_MIN), 2);
// PK < (max, max, 0);
Check(boost::none, make_tuple<int8_t, int8_t, int8_t>(INT8_MAX, INT8_MAX, 0), 2);
// PK >= (0, 0, 0);
Check(make_tuple<int8_t, int8_t, int8_t>(0, 0, 0), boost::none, 1);
// PK >= (0, 0, -1);
Check(make_tuple<int8_t, int8_t, int8_t>(0, 0, -1), boost::none, 1);
// PK >= (0, 0, min);
Check(make_tuple<int8_t, int8_t, int8_t>(0, 0, INT8_MIN), boost::none, 1);
}
TEST_F(PartitionPrunerTest, TestRangePruning) {
// CREATE TABLE t
// (a INT8, b STRING, c INT8)
// PRIMARY KEY (a, b, c))
// DISTRIBUTE BY RANGE(c, b);
// SPLIT ROWS [(0, "m"), (10, "r")];
Schema schema({ ColumnSchema("a", INT8),
ColumnSchema("b", STRING),
ColumnSchema("c", INT8) },
{ ColumnId(0), ColumnId(1), ColumnId(2) },
3);
PartitionSchema partition_schema;
auto pb = PartitionSchemaPB();
auto range_schema = pb.mutable_range_schema();
range_schema->add_columns()->set_name("c");
range_schema->add_columns()->set_name("b");
ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
KuduPartialRow split1(&schema);
ASSERT_OK(split1.SetInt8("c", 0));
ASSERT_OK(split1.SetStringCopy("b", "m"));
KuduPartialRow split2(&schema);
ASSERT_OK(split2.SetInt8("c", 10));
ASSERT_OK(split2.SetStringCopy("b", "r"));
vector<Partition> partitions;
ASSERT_OK(partition_schema.CreatePartitions({ split1, split2 }, {}, {}, schema, &partitions));
// Applies the specified predicates to a scan and checks that the expected
// number of partitions are pruned.
auto Check = [&] (const vector<ColumnPredicate>& predicates, size_t remaining_tablets) {
ScanSpec spec;
for (const auto& pred : predicates) {
spec.AddPredicate(pred);
}
size_t pruner_ranges = remaining_tablets == 0 ? 0 : 1;
CheckPrunedPartitions(schema, partition_schema, partitions, spec,
remaining_tablets, pruner_ranges);
};
int8_t neg_ten = -10;
int8_t zero = 0;
int8_t five = 5;
int8_t ten = 10;
int8_t hundred = 100;
int8_t min = INT8_MIN;
int8_t max = INT8_MAX;
Slice empty = "";
Slice a = "a";
Slice m = "m";
Slice m0 = Slice("m\0", 2);
Slice r = "r";
Slice z = "z";
// No Bounds
Check({}, 3);
// c < -10
Check({ ColumnPredicate::Range(schema.column(2), nullptr, &neg_ten) }, 1);
// c = -10
Check({ ColumnPredicate::Equality(schema.column(2), &neg_ten) }, 1);
// c < 10
Check({ ColumnPredicate::Range(schema.column(2), nullptr, &ten) }, 2);
// c < 100
Check({ ColumnPredicate::Range(schema.column(2), nullptr, &hundred) }, 3);
// c < MIN
Check({ ColumnPredicate::Range(schema.column(2), nullptr, &min) }, 0);
// c < MAX
Check({ ColumnPredicate::Range(schema.column(2), nullptr, &max) }, 3);
// c >= -10
Check({ ColumnPredicate::Range(schema.column(0), &neg_ten, nullptr) }, 3);
// c >= 0
Check({ ColumnPredicate::Range(schema.column(2), &zero, nullptr) }, 3);
// c >= 5
Check({ ColumnPredicate::Range(schema.column(2), &five, nullptr) }, 2);
// c >= 10
Check({ ColumnPredicate::Range(schema.column(2), &ten, nullptr) }, 2);
// c >= 100
Check({ ColumnPredicate::Range(schema.column(2), &hundred, nullptr) }, 1);
// c >= MIN
Check({ ColumnPredicate::Range(schema.column(2), &min, nullptr) }, 3);
// c >= MAX
Check({ ColumnPredicate::Range(schema.column(2), &max, nullptr) }, 1);
// c = MIN
Check({ ColumnPredicate::Equality(schema.column(2), &min) }, 1);
// c = MAX
Check({ ColumnPredicate::Equality(schema.column(2), &max) }, 1);
// c >= -10
// c < 0
Check({ ColumnPredicate::Range(schema.column(2), &neg_ten, &zero) }, 1);
// c >= 5
// c < 100
Check({ ColumnPredicate::Range(schema.column(2), &five, &hundred) }, 2);
// b = ""
Check({ ColumnPredicate::Equality(schema.column(1), &empty) }, 3);
// b >= "z"
Check({ ColumnPredicate::Range(schema.column(1), &z, nullptr) }, 3);
// b < "a"
Check({ ColumnPredicate::Range(schema.column(1), nullptr, &a) }, 3);
// b >= "m"
// b < "z"
Check({ ColumnPredicate::Range(schema.column(1), &m, &z) }, 3);
// c >= 10
// b >= "r"
Check({ ColumnPredicate::Range(schema.column(2), &ten, nullptr),
ColumnPredicate::Range(schema.column(1), &r, nullptr) },
1);
// c >= 10
// b < "r"
Check({ ColumnPredicate::Range(schema.column(2), &ten, nullptr),
ColumnPredicate::Range(schema.column(1), nullptr, &r) },
2);
// c = 10
// b < "r"
Check({ ColumnPredicate::Equality(schema.column(2), &ten),
ColumnPredicate::Range(schema.column(1), nullptr, &r) },
1);
// c < 0
// b < "m"
Check({ ColumnPredicate::Range(schema.column(2), nullptr, &zero),
ColumnPredicate::Range(schema.column(1), nullptr, &m) },
1);
// c < 0
// b < "z"
Check({ ColumnPredicate::Range(schema.column(2), nullptr, &zero),
ColumnPredicate::Range(schema.column(1), nullptr, &z) },
1);
// c = 0
// b = "m\0"
Check({ ColumnPredicate::Equality(schema.column(2), &zero),
ColumnPredicate::Equality(schema.column(1), &m0) },
1);
// c = 0
// b < "m"
Check({ ColumnPredicate::Equality(schema.column(2), &zero),
ColumnPredicate::Range(schema.column(1), nullptr, &m) },
1);
// c = 0
// b < "m\0"
Check({ ColumnPredicate::Equality(schema.column(2), &zero),
ColumnPredicate::Range(schema.column(1), nullptr, &m0) },
2);
// c IS NOT NULL
Check({ ColumnPredicate::IsNotNull(schema.column(2)) }, 3);
}
TEST_F(PartitionPrunerTest, TestHashPruning) {
// CREATE TABLE t
// (a INT8, b INT8, c INT8)
// PRIMARY KEY (a, b, c)
// DISTRIBUTE BY HASH(a) INTO 2 BUCKETS,
// HASH(b, c) INTO 2 BUCKETS;
Schema schema({ ColumnSchema("a", INT8),
ColumnSchema("b", INT8),
ColumnSchema("c", INT8) },
{ ColumnId(0), ColumnId(1), ColumnId(2) },
3);
PartitionSchema partition_schema;
auto pb = PartitionSchemaPB();
pb.mutable_range_schema()->Clear();
auto hash_component_1 = pb.add_hash_bucket_schemas();
hash_component_1->add_columns()->set_name("a");
hash_component_1->set_num_buckets(2);
auto hash_component_2 = pb.add_hash_bucket_schemas();
hash_component_2->add_columns()->set_name("b");
hash_component_2->add_columns()->set_name("c");
hash_component_2->set_num_buckets(2);
ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
vector<Partition> partitions;
ASSERT_OK(partition_schema.CreatePartitions(vector<KuduPartialRow>(), {}, {},
schema, &partitions));
// Applies the specified predicates to a scan and checks that the expected
// number of partitions are pruned.
auto Check = [&] (const vector<ColumnPredicate>& predicates,
size_t remaining_tablets,
size_t pruner_ranges) {
ScanSpec spec;
for (const auto& pred : predicates) {
spec.AddPredicate(pred);
}
CheckPrunedPartitions(schema, partition_schema, partitions, spec,
remaining_tablets, pruner_ranges);
};
int8_t zero = 0;
int8_t one = 1;
int8_t two = 2;
// No Bounds
Check({}, 4, 1);
// a = 0;
Check({ ColumnPredicate::Equality(schema.column(0), &zero) }, 2, 1);
// a >= 0;
Check({ ColumnPredicate::Range(schema.column(0), &zero, nullptr) }, 4, 1);
// a >= 0;
// a < 1;
Check({ ColumnPredicate::Range(schema.column(0), &zero, &one) }, 2, 1);
// a >= 0;
// a < 2;
Check({ ColumnPredicate::Range(schema.column(0), &zero, &two) }, 4, 1);
// b = 1;
Check({ ColumnPredicate::Equality(schema.column(1), &one) }, 4, 1);
// b = 1;
// c = 2;
Check({ ColumnPredicate::Equality(schema.column(1), &one),
ColumnPredicate::Equality(schema.column(2), &two) },
2, 2);
// a = 0;
// b = 1;
// c = 2;
Check({ ColumnPredicate::Equality(schema.column(0), &zero),
ColumnPredicate::Equality(schema.column(1), &one),
ColumnPredicate::Equality(schema.column(2), &two) },
1, 1);
}
TEST_F(PartitionPrunerTest, TestInListHashPruning) {
// CREATE TABLE t
// (a INT8, b INT8, c INT8)
// PRIMARY KEY (a, b, c)
// DISTRIBUTE BY HASH(a) INTO 3 BUCKETS,
// HASH(b) INTO 3 BUCKETS;
// HASH(c) INTO 3 BUCKETS;
Schema schema({ ColumnSchema("a", INT8),
ColumnSchema("b", INT8),
ColumnSchema("c", INT8) },
{ ColumnId(0), ColumnId(1), ColumnId(2) },
3);
PartitionSchema partition_schema;
auto pb = PartitionSchemaPB();
auto hash_component_1 = pb.add_hash_bucket_schemas();
hash_component_1->add_columns()->set_name("a");
hash_component_1->set_num_buckets(3);
hash_component_1->set_seed(0);
auto hash_component_2 = pb.add_hash_bucket_schemas();
hash_component_2->add_columns()->set_name("b");
hash_component_2->set_num_buckets(3);
hash_component_2->set_seed(0);
auto hash_component_3 = pb.add_hash_bucket_schemas();
hash_component_3->add_columns()->set_name("c");
hash_component_3->set_num_buckets(3);
hash_component_3->set_seed(0);
pb.mutable_range_schema()->clear_columns();
ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
vector<Partition> partitions;
ASSERT_OK(partition_schema.CreatePartitions(vector<KuduPartialRow>(), {}, {},
schema, &partitions));
// Applies the specified predicates to a scan and checks that the expected
// number of partitions are pruned.
auto Check = [&] (const vector<ColumnPredicate>& predicates,
size_t remaining_tablets,
size_t pruner_ranges) {
ScanSpec spec;
for (const auto& pred : predicates) {
spec.AddPredicate(pred);
}
CheckPrunedPartitions(schema, partition_schema, partitions, spec,
remaining_tablets, pruner_ranges);
};
// zero, one, eight are in different buckets when bucket number is 3 and seed is 0.
int8_t zero = 0;
int8_t one = 1;
int8_t eight = 8;
vector<const void*> a_values;
vector<const void*> b_values;
vector<const void*> c_values;
// a in [0, 1];
a_values = { &zero, &one };
Check({ ColumnPredicate::InList(schema.column(0), &a_values) }, 18, 2);
// a in [0, 1, 8];
a_values = { &zero, &one, &eight };
Check({ ColumnPredicate::InList(schema.column(0), &a_values) }, 27, 1);
// b in [0, 1]
b_values = { &zero, &one };
Check({ ColumnPredicate::InList(schema.column(1), &b_values) }, 18, 6);
// c in [0, 1]
c_values = { &zero, &one };
Check({ ColumnPredicate::InList(schema.column(2), &c_values) }, 18, 18);
// b in [0, 1], c in [0, 1]
b_values = { &zero, &one };
c_values = { &zero, &one };
Check({ ColumnPredicate::InList(schema.column(1), &b_values),
ColumnPredicate::InList(schema.column(2), &c_values) },
12, 12);
//a in [0, 1], b in [0, 1], c in [0, 1]
a_values = { &zero, &one };
b_values = { &zero, &one };
c_values = { &zero, &one };
Check({ ColumnPredicate::InList(schema.column(0), &a_values),
ColumnPredicate::InList(schema.column(1), &b_values),
ColumnPredicate::InList(schema.column(2), &c_values) },
8, 8);
}
TEST_F(PartitionPrunerTest, TestMultiColumnInListHashPruning) {
// CREATE TABLE t
// (a INT8, b INT8, c INT8)
// PRIMARY KEY (a, b, c)
// DISTRIBUTE BY HASH(a) INTO 3 BUCKETS,
// HASH(b, c) INTO 3 BUCKETS;
Schema schema({ ColumnSchema("a", INT8),
ColumnSchema("b", INT8),
ColumnSchema("c", INT8) },
{ ColumnId(0), ColumnId(1), ColumnId(2) },
3);
PartitionSchema partition_schema;
auto pb = PartitionSchemaPB();
auto hash_component_1 = pb.add_hash_bucket_schemas();
hash_component_1->add_columns()->set_name("a");
hash_component_1->set_num_buckets(3);
hash_component_1->set_seed(0);
auto hash_component_2 = pb.add_hash_bucket_schemas();
hash_component_2->add_columns()->set_name("b");
hash_component_2->add_columns()->set_name("c");
hash_component_2->set_num_buckets(3);
hash_component_2->set_seed(0);
pb.mutable_range_schema()->clear_columns();
ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
vector<Partition> partitions;
ASSERT_OK(partition_schema.CreatePartitions(vector<KuduPartialRow>(), {}, {},
schema, &partitions));
// Applies the specified predicates to a scan and checks that the expected
// number of partitions are pruned.
auto Check = [&] (const vector<ColumnPredicate>& predicates,
size_t remaining_tablets,
size_t pruner_ranges) {
ScanSpec spec;
for (const auto& pred : predicates) {
spec.AddPredicate(pred);
}
CheckPrunedPartitions(schema, partition_schema, partitions, spec,
remaining_tablets, pruner_ranges);
};
// zero, one, eight are in different buckets when bucket number is 3 and seed is 0.
int8_t zero = 0;
int8_t one = 1;
int8_t eight = 8;
vector<const void*> a_values;
vector<const void*> b_values;
vector<const void*> c_values;
// a in [0, 1];
a_values = { &zero, &one };
Check({ ColumnPredicate::InList(schema.column(0), &a_values) }, 6, 2);
// a in [0, 1, 8];
a_values = { &zero, &one, &eight };
Check({ ColumnPredicate::InList(schema.column(0), &a_values) }, 9, 1);
// b in [0, 1]
b_values = { &zero, &one };
Check({ ColumnPredicate::InList(schema.column(1), &b_values) }, 9, 1);
// c in [0, 1]
c_values = { &zero, &one };
Check({ ColumnPredicate::InList(schema.column(2), &c_values) }, 9, 1);
// b in [0, 1], c in [0, 1]
// (0, 0) in bucket 2
// (0, 1) in bucket 2
// (1, 0) in bucket 1
// (1, 1) in bucket 0
b_values = { &zero, &one };
c_values = { &zero, &one };
Check({ ColumnPredicate::InList(schema.column(1), &b_values),
ColumnPredicate::InList(schema.column(2), &c_values) },
9, 1);
// b = 0, c in [0, 1]
c_values = { &zero, &one };
Check({ ColumnPredicate::Equality(schema.column(1), &zero),
ColumnPredicate::InList(schema.column(2), &c_values) },
3, 3);
// b = 1, c in [0, 1]
c_values = { &zero, &one };
Check({ ColumnPredicate::Equality(schema.column(1), &one),
ColumnPredicate::InList(schema.column(2), &c_values) },
6, 6);
//a in [0, 1], b in [0, 1], c in [0, 1]
a_values = { &zero, &one };
b_values = { &zero, &one };
c_values = { &zero, &one };
Check({ ColumnPredicate::InList(schema.column(0), &a_values),
ColumnPredicate::InList(schema.column(1), &b_values),
ColumnPredicate::InList(schema.column(2), &c_values) },
6, 2);
}
TEST_F(PartitionPrunerTest, TestPruning) {
// CREATE TABLE timeseries
// (host STRING, metric STRING, time UNIXTIME_MICROS, value DOUBLE)
// PRIMARY KEY (host, metric, time)
// PARTITION BY RANGE (time) (PARTITION VALUES < 10,
// PARTITION VALUES >= 10)
// HASH (host, metric) 2 PARTITIONS;
Schema schema({ ColumnSchema("host", STRING),
ColumnSchema("metric", STRING),
ColumnSchema("time", UNIXTIME_MICROS),
ColumnSchema("value", DOUBLE) },
{ ColumnId(0), ColumnId(1), ColumnId(2), ColumnId(3) },
3);
PartitionSchema partition_schema;
auto pb = PartitionSchemaPB();
pb.mutable_range_schema()->add_columns()->set_name("time");
auto hash = pb.add_hash_bucket_schemas();
hash->add_columns()->set_name("host");
hash->add_columns()->set_name("metric");
hash->set_num_buckets(2);
ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
KuduPartialRow split(&schema);
ASSERT_OK(split.SetUnixTimeMicros("time", 10));
vector<Partition> partitions;
ASSERT_OK(partition_schema.CreatePartitions(vector<KuduPartialRow>{ split }, {}, {},
schema, &partitions));
ASSERT_EQ(4, partitions.size());
// Applies the specified predicates to a scan and checks that the expected
// number of partitions are pruned.
auto Check = [&] (const vector<ColumnPredicate>& predicates,
string lower_bound_partition_key,
string upper_bound_partition_key,
size_t remaining_tablets,
size_t pruner_ranges) {
ScanSpec spec;
spec.SetLowerBoundPartitionKey(lower_bound_partition_key);
spec.SetExclusiveUpperBoundPartitionKey(upper_bound_partition_key);
for (const auto& pred : predicates) {
spec.AddPredicate(pred);
}
CheckPrunedPartitions(schema, partition_schema, partitions, spec,
remaining_tablets, pruner_ranges);
};
Slice a = "a";
int64_t nine = 9;
int64_t ten = 10;
int64_t twenty = 20;
// host = "a"
// metric = "a"
// timestamp >= 9;
Check({ ColumnPredicate::Equality(schema.column(0), &a),
ColumnPredicate::Equality(schema.column(1), &a),
ColumnPredicate::Range(schema.column(2), &nine, nullptr) },
"", "",
2, 1);
// host = "a"
// metric = "a"
// timestamp >= 10;
// timestamp < 20;
Check({ ColumnPredicate::Equality(schema.column(0), &a),
ColumnPredicate::Equality(schema.column(1), &a),
ColumnPredicate::Range(schema.column(2), &ten, &twenty) },
"", "",
1, 1);
// host = "a"
// metric = "a"
// timestamp < 10;
Check({ ColumnPredicate::Equality(schema.column(0), &a),
ColumnPredicate::Equality(schema.column(1), &a),
ColumnPredicate::Range(schema.column(2), nullptr, &ten) },
"", "",
1, 1);
// host = "a"
// metric = "a"
// timestamp >= 10;
Check({ ColumnPredicate::Equality(schema.column(0), &a),
ColumnPredicate::Equality(schema.column(1), &a),
ColumnPredicate::Range(schema.column(2), &ten, nullptr) },
"", "",
1, 1);
// host = "a"
// metric = "a"
// timestamp = 10;
Check({ ColumnPredicate::Equality(schema.column(0), &a),
ColumnPredicate::Equality(schema.column(1), &a),
ColumnPredicate::Equality(schema.column(2), &ten) },
"", "",
1, 1);
// partition key < (hash=1)
Check({}, "", string("\0\0\0\1", 4), 2, 1);
// partition key >= (hash=1)
Check({}, string("\0\0\0\1", 4), "", 2, 1);
// timestamp = 10
// partition key < (hash=1)
Check({ ColumnPredicate::Equality(schema.column(2), &ten) },
"", string("\0\0\0\1", 4), 1, 1);
// timestamp = 10
// partition key >= (hash=1)
Check({ ColumnPredicate::Equality(schema.column(2), &ten) },
string("\0\0\0\1", 4), "", 1, 1);
}
TEST_F(PartitionPrunerTest, TestKudu2173) {
// CREATE TABLE t
// (a INT8, b INT8, PRIMARY KEY (a, b))
// DISTRIBUTE BY RANGE(a)
// SPLIT ROWS [(10)]
// Setup the Schema
Schema schema({ ColumnSchema("a", INT8),
ColumnSchema("b", INT8)},
{ ColumnId(0), ColumnId(1) },
2);
PartitionSchema partition_schema;
auto pb = PartitionSchemaPB();
auto range_schema = pb.mutable_range_schema();
range_schema->add_columns()->set_name("a");
ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
KuduPartialRow split1(&schema);
ASSERT_OK(split1.SetInt8("a", 10));
vector<Partition> partitions;
ASSERT_OK(partition_schema.CreatePartitions({ split1 }, {}, {}, schema, &partitions));
// Applies the specified predicates to a scan and checks that the expected
// number of partitions are pruned.
auto Check = [&] (const vector<ColumnPredicate>& predicates, size_t remaining_tablets) {
ScanSpec spec;
for (const auto& pred : predicates) {
spec.AddPredicate(pred);
}
size_t pruner_ranges = remaining_tablets == 0 ? 0 : 1;
CheckPrunedPartitions(schema, partition_schema, partitions, spec,
remaining_tablets, pruner_ranges);
};
int8_t eleven = 11;
int8_t max = INT8_MAX;
// a < 11
Check({ ColumnPredicate::Range(schema.column(0), nullptr, &eleven) }, 2);
// a < 11 AND b < 11
Check({ ColumnPredicate::Range(schema.column(0), nullptr, &eleven),
ColumnPredicate::Range(schema.column(1), nullptr, &eleven) },
2);
// a < max
Check({ ColumnPredicate::Range(schema.column(0), nullptr, &max) }, 2);
// a < max AND b < 11
Check({ ColumnPredicate::Range(schema.column(0), nullptr, &max),
ColumnPredicate::Range(schema.column(1), nullptr, &eleven) },
2);
}
} // namespace kudu