blob: b91c363c99635d26221281651e4de9724c3b6983 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <algorithm>
#include <cmath>
#include <cstddef>
#include <cstdint>
#include <deque>
#include <initializer_list>
#include <iterator>
#include <limits>
#include <memory>
#include <ostream>
#include <random>
#include <string>
#include <type_traits>
#include <unordered_set>
#include <utility>
#include <vector>
#include <gflags/gflags_declare.h>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "kudu/client/client.h"
#include "kudu/client/scan_batch.h"
#include "kudu/client/scan_predicate.h"
#include "kudu/client/schema.h"
#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
#include "kudu/client/value.h"
#include "kudu/client/write_op.h"
#include "kudu/common/partial_row.h"
#include "kudu/gutil/integral_types.h"
#include "kudu/gutil/strings/escaping.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/integration-tests/cluster_itest_util.h"
#include "kudu/mini-cluster/internal_mini_cluster.h"
#include "kudu/tserver/mini_tablet_server.h"
#include "kudu/util/block_bloom_filter.h"
#include "kudu/util/decimal_util.h"
#include "kudu/util/hash.pb.h"
#include "kudu/util/int128.h"
#include "kudu/util/metrics.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/random.h"
#include "kudu/util/random_util.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
DECLARE_int32(predicate_effectivess_num_skip_blocks);
METRIC_DECLARE_counter(scanner_predicates_disabled);
METRIC_DECLARE_entity(tablet);
using std::count_if;
using std::deque;
using std::numeric_limits;
using std::string;
using std::unique_ptr;
using std::unordered_set;
using std::vector;
using strings::Substitute;
namespace kudu {
namespace client {
using cluster::InternalMiniCluster;
using cluster::InternalMiniClusterOptions;
using sp::shared_ptr;
class PredicateTest : public KuduTest {
protected:
static constexpr float kBloomFilterFalsePositiveProb = 0.01;
void SetUp() override {
// Set up the mini cluster
cluster_.reset(new InternalMiniCluster(env_, InternalMiniClusterOptions()));
ASSERT_OK(cluster_->Start());
ASSERT_OK(cluster_->CreateClient(nullptr, &client_));
}
// Creates a key/value table schema with an int64 key and value of the
// specified type.
shared_ptr<KuduTable> CreateAndOpenTable(KuduColumnSchema::DataType value_type) {
KuduSchema schema;
{
KuduSchemaBuilder builder;
builder.AddColumn("key")->NotNull()->Type(KuduColumnSchema::INT64)->PrimaryKey();
switch (value_type) {
case KuduColumnSchema::VARCHAR:
builder.AddColumn("value")->Type(value_type)
->Length(40);
break;
case KuduColumnSchema::DECIMAL:
builder.AddColumn("value")->Type(value_type)
->Precision(kMaxDecimal128Precision)->Scale(2);
break;
default:
builder.AddColumn("value")->Type(value_type);
break;
}
CHECK_OK(builder.Build(&schema));
}
unique_ptr<client::KuduTableCreator> table_creator(client_->NewTableCreator());
CHECK_OK(table_creator->table_name("table")
.schema(&schema)
.set_range_partition_columns({ "key" })
.num_replicas(1)
.Create());
shared_ptr<KuduTable> table;
CHECK_OK(client_->OpenTable("table", &table));
return table;
}
// Creates a new session in automatic background flush mode.
shared_ptr<KuduSession> CreateSession() {
shared_ptr<KuduSession> session = client_->NewSession();
session->SetTimeoutMillis(10000);
CHECK_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND));
return session;
}
// Count the rows in a table which satisfy the specified predicates.
int DoCountRows(const shared_ptr<KuduTable>& table,
const vector<KuduPredicate*>& predicates) {
KuduScanner scanner(table.get());
for (KuduPredicate* predicate : predicates) {
CHECK_OK(scanner.AddConjunctPredicate(predicate));
}
CHECK_OK(scanner.Open());
int rows = 0;
while (scanner.HasMoreRows()) {
KuduScanBatch batch;
CHECK_OK(scanner.NextBatch(&batch));
rows += batch.NumRows();
}
return rows;
}
// Count the rows in a table which satisfy the specified predicates. This
// also does a separate scan with cloned predicates, in order to test that
// cloned predicates behave exactly as the original.
int CountRows(const shared_ptr<KuduTable>& table,
const vector<KuduPredicate*>& predicates) {
vector<KuduPredicate*> cloned_predicates;
cloned_predicates.reserve(predicates.size());
for (KuduPredicate* pred : predicates) {
cloned_predicates.push_back(pred->Clone());
}
int cloned_count = DoCountRows(table, cloned_predicates);
int count = DoCountRows(table, predicates);
CHECK_EQ(count, cloned_count);
return count;
}
template <typename T>
int CountMatchedRows(const vector<T>& values, const vector<T>& test_values) {
int count = 0;
for (const T& v : values) {
if (std::any_of(test_values.begin(), test_values.end(),
[&] (const T& t) { return t == v; })) {
count++;
}
}
return count;
}
// This function helps distinguish floating point values like -0.0 against 0.0.
template<typename F>
int CountMatchedFloatingPointRowsWithSignBit(const vector<F>& values,
const vector<F>& test_values) {
static_assert(std::is_floating_point<F>::value,
"This function must only be used with floating point data types");
int count = 0;
for (const F& v : values) {
if (std::any_of(test_values.begin(), test_values.end(),
[&] (const F& f) { return f == v && std::signbit(f) == std::signbit(v); })) {
count++;
}
}
return count;
}
// Returns a vector of ints from -50 (inclusive) to 50 (exclusive), and
// boundary values.
template <typename T>
vector<T> CreateIntValues() {
vector<T> values;
for (int i = -50; i < 50; i++) {
values.push_back(i);
}
values.push_back(numeric_limits<T>::min());
values.push_back(numeric_limits<T>::min() + 1);
values.push_back(numeric_limits<T>::max() - 1);
values.push_back(numeric_limits<T>::max());
return values;
}
// Returns a vector of ints for testing as predicate boundaries.
template <typename T>
vector<T> CreateIntTestValues() {
return {
numeric_limits<T>::min(),
numeric_limits<T>::min() + 1,
-51,
-50,
0,
49,
50,
numeric_limits<T>::max() - 1,
numeric_limits<T>::max(),
};
}
// Returns a vector of floating point numbers from -50.50 (inclusive) to 49.49
// (exclusive) (100 values), plus min, max, two normals around 0, two
// subnormals around 0, positive and negative infinity, and NaN.
template <typename T>
vector<T> CreateFloatingPointValues() {
vector<T> values;
for (int i = -50; i < 50; i++) {
values.push_back(static_cast<T>(i) + static_cast<T>(i) / 100);
}
// Add special values (listed in ascending order)
values.push_back(-numeric_limits<T>::infinity());
values.push_back(numeric_limits<T>::lowest());
values.push_back(-numeric_limits<T>::min());
values.push_back(-numeric_limits<T>::denorm_min());
values.push_back(-0.0);
values.push_back(numeric_limits<T>::denorm_min());
values.push_back(numeric_limits<T>::min());
values.push_back(numeric_limits<T>::max());
values.push_back(numeric_limits<T>::infinity());
// Add special NaN value
// TODO: uncomment after fixing KUDU-1386
// values.push_back(numeric_limits<T>::quiet_NaN());
return values;
}
/// Returns a vector of floating point numbers for creating test predicates.
template <typename T>
vector<T> CreateFloatingPointTestValues() {
return {
-numeric_limits<T>::infinity(),
numeric_limits<T>::lowest(),
-100.0,
-1.1,
-1.0,
-numeric_limits<T>::min(),
-numeric_limits<T>::denorm_min(),
-0.0,
0.0,
numeric_limits<T>::denorm_min(),
numeric_limits<T>::min(),
1.0,
1.1,
100.0,
numeric_limits<T>::max(),
numeric_limits<T>::infinity(),
// TODO: uncomment after fixing KUDU-1386
// numeric_limits<T>::quiet_NaN();
};
}
// Returns a vector of decimal(4, 2) numbers from -50.50 (inclusive) to 50.50
// (exclusive) (100 values) and boundary values.
vector<int128_t> CreateDecimalValues() {
vector<int128_t> values;
for (int i = -50; i < 50; i++) {
values.push_back(i * 100 + i);
}
values.push_back(-9999);
values.push_back(-9998);
values.push_back(9998);
values.push_back(9999);
return values;
}
/// Returns a vector of decimal numbers for creating test predicates.
vector<int128_t> CreateDecimalTestValues() {
return {
-9999,
-9998,
-5100,
-5000,
0,
4900,
5000,
9998,
9999,
};
}
// Returns a vector of string values.
vector<string> CreateStringValues() {
return {
string(),
string("\0", 1),
string("\0\0", 2),
string("a", 1),
string("a\0", 2),
string("a\0a", 3),
string("aa\0", 3),
};
}
static KuduBloomFilter* CreateBloomFilter(
int nkeys,
float false_positive_prob = kBloomFilterFalsePositiveProb) {
KuduBloomFilterBuilder builder(nkeys);
builder.false_positive_probability(false_positive_prob);
KuduBloomFilter* bf;
CHECK_OK(builder.Build(&bf));
return bf;
}
static unique_ptr<BlockBloomFilter> CreateDirectBloomFilter(
int nkeys,
float false_positive_prob = kBloomFilterFalsePositiveProb) {
unique_ptr<BlockBloomFilter> bf(
new BlockBloomFilter(DefaultBlockBloomFilterBufferAllocator::GetSingleton()));
CHECK_OK(bf->Init(BlockBloomFilter::MinLogSpace(nkeys, false_positive_prob),
kudu::FAST_HASH, 0));
return bf;
}
template<typename BloomFilterType, typename Collection>
static void InsertValuesInBloomFilter(BloomFilterType* bloom_filter, const Collection& values) {
for (const auto& v : values) {
bloom_filter->Insert(Slice(reinterpret_cast<const uint8_t*>(&v), sizeof(v)));
}
}
template<class Collection>
static KuduBloomFilter* CreateBloomFilterWithValues(const Collection& values) {
KuduBloomFilter* bloom_filter = CreateBloomFilter(values.size());
InsertValuesInBloomFilter(bloom_filter, values);
return bloom_filter;
}
template<class Collection>
static unique_ptr<BlockBloomFilter> CreateDirectBloomFilterWithValues(const Collection& values) {
unique_ptr<BlockBloomFilter> bloom_filter = CreateDirectBloomFilter(values.size());
InsertValuesInBloomFilter(bloom_filter.get(), values);
return bloom_filter;
}
void CheckInBloomFilterPredicate(const shared_ptr<KuduTable>& table,
KuduBloomFilter* in_bloom_filter,
int expected_count) {
vector<KuduBloomFilter*> bf_vec = { in_bloom_filter };
auto* bf_predicate = table->NewInBloomFilterPredicate("value", &bf_vec);
ASSERT_TRUE(bf_vec.empty());
ASSERT_EQ(expected_count, CountRows(table, { bf_predicate }));
}
// Check integer predicates against the specified table. The table must have
// key/value rows with values from CreateIntValues, plus one null value.
template <typename T>
void CheckIntPredicates(const shared_ptr<KuduTable>& table) {
vector<T> values = CreateIntValues<T>();
vector<T> test_values = CreateIntTestValues<T>();
ASSERT_EQ(values.size() + 1, CountRows(table, {}));
for (T v : test_values) {
SCOPED_TRACE(Substitute("test value: $0", v));
{ // value = v
int count = count_if(values.begin(), values.end(),
[&] (T value) { return value == v; });
ASSERT_EQ(count, CountRows(table, {
table->NewComparisonPredicate("value",
KuduPredicate::EQUAL,
KuduValue::FromInt(v)),
}));
}
{ // value >= v
int count = count_if(values.begin(), values.end(),
[&] (T value) { return value >= v; });
ASSERT_EQ(count, CountRows(table, {
table->NewComparisonPredicate("value",
KuduPredicate::GREATER_EQUAL,
KuduValue::FromInt(v)),
}));
}
{ // value <= v
int count = count_if(values.begin(), values.end(),
[&] (T value) { return value <= v; });
ASSERT_EQ(count, CountRows(table, {
table->NewComparisonPredicate("value",
KuduPredicate::LESS_EQUAL,
KuduValue::FromInt(v)),
}));
}
{ // value > v
int count = count_if(values.begin(), values.end(),
[&] (T value) { return value > v; });
ASSERT_EQ(count, CountRows(table, {
table->NewComparisonPredicate("value",
KuduPredicate::GREATER,
KuduValue::FromInt(v)),
}));
}
{ // value < v
int count = count_if(values.begin(), values.end(),
[&] (T value) { return value < v; });
ASSERT_EQ(count, CountRows(table, {
table->NewComparisonPredicate("value",
KuduPredicate::LESS,
KuduValue::FromInt(v)),
}));
}
{ // value >= 0
// value <= v
int count = count_if(values.begin(), values.end(),
[&] (T value) { return value >= 0 && value <= v; });
ASSERT_EQ(count, CountRows(table, {
table->NewComparisonPredicate("value",
KuduPredicate::GREATER_EQUAL,
KuduValue::FromInt(0)),
table->NewComparisonPredicate("value",
KuduPredicate::LESS_EQUAL,
KuduValue::FromInt(v)),
}));
}
{ // value >= v
// value <= 0
int count = count_if(values.begin(), values.end(),
[&] (T value) { return value >= v && value <= 0; });
ASSERT_EQ(count, CountRows(table, {
table->NewComparisonPredicate("value",
KuduPredicate::GREATER_EQUAL,
KuduValue::FromInt(v)),
table->NewComparisonPredicate("value",
KuduPredicate::LESS_EQUAL,
KuduValue::FromInt(0)),
}));
}
}
// IN list and IN Bloom filter predicates
std::mt19937 gen(SeedRandom());
std::shuffle(test_values.begin(), test_values.end(), gen);
for (auto end = test_values.begin(); end <= test_values.end(); end++) {
vector<KuduValue*> vals;
auto* bloom_filter = CreateBloomFilter(std::distance(test_values.begin(), end));
for (auto itr = test_values.begin(); itr != end; itr++) {
vals.push_back(KuduValue::FromInt(*itr));
auto key = *itr;
bloom_filter->Insert(Slice(reinterpret_cast<const uint8_t*>(&key), sizeof(key)));
}
int count = CountMatchedRows<T>(values, vector<T>(test_values.begin(), end));
ASSERT_EQ(count, CountRows(table, { table->NewInListPredicate("value", &vals) }));
CheckInBloomFilterPredicate(table, bloom_filter, count);
}
// IS NOT NULL predicate
ASSERT_EQ(CountRows(table, {}) - 1,
CountRows(table, { table->NewIsNotNullPredicate("value") }));
// IS NULL predicate
ASSERT_EQ(1, CountRows(table, { table->NewIsNullPredicate("value") }));
}
// Check string predicates against the specified table.
void CheckStringPredicates(const shared_ptr<KuduTable>& table, const vector<string>& values) {
ASSERT_EQ(values.size() + 1, CountRows(table, {}));
// Add some additional values to check against.
vector<string> test_values = values;
test_values.emplace_back("aa");
test_values.emplace_back("\1", 1);
test_values.emplace_back("a\1", 1);
for (const string& v : test_values) {
SCOPED_TRACE(Substitute("test value: '$0'", strings::CHexEscape(v)));
{ // value = v
int count = count_if(values.begin(), values.end(),
[&] (const string& value) { return value == v; });
ASSERT_EQ(count, CountRows(table, {
table->NewComparisonPredicate("value",
KuduPredicate::EQUAL,
KuduValue::CopyString(v)),
}));
}
{ // value >= v
int count = count_if(values.begin(), values.end(),
[&] (const string& value) { return value >= v; });
ASSERT_EQ(count, CountRows(table, {
table->NewComparisonPredicate("value",
KuduPredicate::GREATER_EQUAL,
KuduValue::CopyString(v)),
}));
}
{ // value <= v
int count = count_if(values.begin(), values.end(),
[&] (const string& value) { return value <= v; });
ASSERT_EQ(count, CountRows(table, {
table->NewComparisonPredicate("value",
KuduPredicate::LESS_EQUAL,
KuduValue::CopyString(v)),
}));
}
{ // value > v
int count = count_if(values.begin(), values.end(),
[&] (const string& value) { return value > v; });
ASSERT_EQ(count, CountRows(table, {
table->NewComparisonPredicate("value",
KuduPredicate::GREATER,
KuduValue::CopyString(v)),
}));
}
{ // value < v
int count = count_if(values.begin(), values.end(),
[&] (const string& value) { return value < v; });
ASSERT_EQ(count, CountRows(table, {
table->NewComparisonPredicate("value",
KuduPredicate::LESS,
KuduValue::CopyString(v)),
}));
}
{ // value >= "a"
// value <= v
int count = count_if(values.begin(), values.end(),
[&] (const string& value) { return value >= "a" && value <= v; });
ASSERT_EQ(count, CountRows(table, {
table->NewComparisonPredicate("value",
KuduPredicate::GREATER_EQUAL,
KuduValue::CopyString("a")),
table->NewComparisonPredicate("value",
KuduPredicate::LESS_EQUAL,
KuduValue::CopyString(v)),
}));
}
{ // value >= v
// value <= "a"
int count = count_if(values.begin(), values.end(),
[&] (const string& value) { return value >= v && value <= "a"; });
ASSERT_EQ(count, CountRows(table, {
table->NewComparisonPredicate("value",
KuduPredicate::GREATER_EQUAL,
KuduValue::CopyString(v)),
table->NewComparisonPredicate("value",
KuduPredicate::LESS_EQUAL,
KuduValue::CopyString("a")),
}));
}
}
// IN list and IN Bloom filter predicates
std::mt19937 gen(SeedRandom());
std::shuffle(test_values.begin(), test_values.end(), gen);
for (auto end = test_values.begin(); end <= test_values.end(); end++) {
vector<KuduValue*> vals;
auto* bloom_filter = CreateBloomFilter(std::distance(test_values.begin(), end));
for (auto itr = test_values.begin(); itr != end; itr++) {
vals.push_back(KuduValue::CopyString(*itr));
bloom_filter->Insert(*itr);
}
int count = CountMatchedRows<string>(values, vector<string>(test_values.begin(), end));
ASSERT_EQ(count, CountRows(table, { table->NewInListPredicate("value", &vals) }));
CheckInBloomFilterPredicate(table, bloom_filter, count);
}
// IS NOT NULL predicate
ASSERT_EQ(CountRows(table, {}) - 1,
CountRows(table, { table->NewIsNotNullPredicate("value") }));
// IS NULL predicate
ASSERT_EQ(1, CountRows(table, { table->NewIsNullPredicate("value") }));
}
string GetTheOnlyTabletId() {
// Cluster is setup with single tablet server and a single table.
CHECK_EQ(1, cluster_->num_tablet_servers());
const auto* mini_tserver = cluster_->mini_tablet_server(0);
const auto& tablets = mini_tserver->ListTablets();
CHECK_EQ(1, tablets.size());
return tablets[0];
}
// Function to verify ineffective disabled predicates on specified tablet.
void CheckDisabledPredicates(const string& tablet_id, int64_t expected_disabled_predicates) {
// Cluster is setup with single tablet server and a single table.
ASSERT_EQ(1, cluster_->num_tablet_servers());
const auto* mini_tserver = cluster_->mini_tablet_server(0);
int64_t predicates_disabled = 0;
ASSERT_OK(itest::GetInt64Metric(HostPort(mini_tserver->bound_http_addr()),
&METRIC_ENTITY_tablet,
tablet_id.c_str(),
&METRIC_scanner_predicates_disabled,
"value",
&predicates_disabled));
ASSERT_EQ(expected_disabled_predicates, predicates_disabled);
}
shared_ptr<KuduClient> client_;
unique_ptr<InternalMiniCluster> cluster_;
};
TEST_F(PredicateTest, TestBoolPredicates) {
shared_ptr<KuduTable> table = CreateAndOpenTable(KuduColumnSchema::BOOL);
shared_ptr<KuduSession> session = CreateSession();
int i = 0;
for (bool b : { false, true }) {
unique_ptr<KuduInsert> insert(table->NewInsert());
ASSERT_OK(insert->mutable_row()->SetInt64("key", i++));
ASSERT_OK(insert->mutable_row()->SetBool("value", b));
ASSERT_OK(session->Apply(insert.release()));
}
// Insert null value
unique_ptr<KuduInsert> insert(table->NewInsert());
ASSERT_OK(insert->mutable_row()->SetInt64("key", i++));
ASSERT_OK(session->Apply(insert.release()));
ASSERT_OK(session->Flush());
ASSERT_EQ(3, CountRows(table, {}));
{ // value = false
KuduPredicate* pred = table->NewComparisonPredicate("value",
KuduPredicate::EQUAL,
KuduValue::FromBool(false));
ASSERT_EQ(1, CountRows(table, { pred }));
}
{ // value = true
KuduPredicate* pred = table->NewComparisonPredicate("value",
KuduPredicate::EQUAL,
KuduValue::FromBool(true));
ASSERT_EQ(1, CountRows(table, { pred }));
}
{ // value >= true
KuduPredicate* pred = table->NewComparisonPredicate("value",
KuduPredicate::GREATER_EQUAL,
KuduValue::FromBool(true));
ASSERT_EQ(1, CountRows(table, { pred }));
}
{ // value >= false
KuduPredicate* pred = table->NewComparisonPredicate("value",
KuduPredicate::GREATER_EQUAL,
KuduValue::FromBool(false));
ASSERT_EQ(2, CountRows(table, { pred }));
}
{ // value <= false
KuduPredicate* pred = table->NewComparisonPredicate("value",
KuduPredicate::LESS_EQUAL,
KuduValue::FromBool(false));
ASSERT_EQ(1, CountRows(table, { pred }));
}
{ // value <= true
KuduPredicate* pred = table->NewComparisonPredicate("value",
KuduPredicate::LESS_EQUAL,
KuduValue::FromBool(true));
ASSERT_EQ(2, CountRows(table, { pred }));
}
{ // value > true
KuduPredicate* pred = table->NewComparisonPredicate("value",
KuduPredicate::GREATER,
KuduValue::FromBool(true));
ASSERT_EQ(0, CountRows(table, { pred }));
}
{ // value > false
KuduPredicate* pred = table->NewComparisonPredicate("value",
KuduPredicate::GREATER,
KuduValue::FromBool(false));
ASSERT_EQ(1, CountRows(table, { pred }));
}
{ // value < false
KuduPredicate* pred = table->NewComparisonPredicate("value",
KuduPredicate::LESS,
KuduValue::FromBool(false));
ASSERT_EQ(0, CountRows(table, { pred }));
}
{ // value < true
KuduPredicate* pred = table->NewComparisonPredicate("value",
KuduPredicate::LESS,
KuduValue::FromBool(true));
ASSERT_EQ(1, CountRows(table, { pred }));
}
{ // value IN ()
vector<KuduValue*> values = { };
KuduPredicate* pred = table->NewInListPredicate("value", &values);
ASSERT_EQ(0, CountRows(table, { pred }));
}
{ // value IN (true)
vector<KuduValue*> values = { KuduValue::FromBool(true) };
KuduPredicate* pred = table->NewInListPredicate("value", &values);
ASSERT_EQ(1, CountRows(table, { pred }));
}
{ // value IN (false)
vector<KuduValue*> values = { KuduValue::FromBool(false) };
KuduPredicate* pred = table->NewInListPredicate("value", &values);
ASSERT_EQ(1, CountRows(table, { pred }));
}
{ // value IN (true, false)
vector<KuduValue*> values = { KuduValue::FromBool(false), KuduValue::FromBool(true) };
KuduPredicate* pred = table->NewInListPredicate("value", &values);
ASSERT_EQ(2, CountRows(table, { pred }));
}
{ // empty BloomFilter
auto* bloom_filter = CreateBloomFilter(0);
CheckInBloomFilterPredicate(table, bloom_filter, 0);
}
{ // vector with no BloomFilters
vector<KuduBloomFilter*> no_bloom_filters = {};
auto* bf_predicate = table->NewInBloomFilterPredicate("value", &no_bloom_filters);
ASSERT_TRUE(no_bloom_filters.empty());
KuduScanner scanner(table.get());
Status s = scanner.AddConjunctPredicate(bf_predicate);
ASSERT_TRUE(s.IsInvalidArgument());
ASSERT_STR_CONTAINS(s.ToString(), "No Bloom filters supplied");
}
{ // BloomFilter with (true)
auto* bloom_filter = CreateBloomFilter(1);
bool key = true;
bloom_filter->Insert(Slice(reinterpret_cast<const uint8_t*>(&key), sizeof(key)));
CheckInBloomFilterPredicate(table, bloom_filter, 1);
}
{ // BloomFilter with (false)
auto* bloom_filter = CreateBloomFilter(1);
bool key = false;
bloom_filter->Insert(Slice(reinterpret_cast<const uint8_t*>(&key), sizeof(key)));
CheckInBloomFilterPredicate(table, bloom_filter, 1);
}
{ // BloomFilter with (true, false)
auto* bloom_filter = CreateBloomFilter(2);
bool key = true;
bloom_filter->Insert(Slice(reinterpret_cast<const uint8_t*>(&key), sizeof(key)));
key = false;
bloom_filter->Insert(Slice(reinterpret_cast<const uint8_t*>(&key), sizeof(key)));
CheckInBloomFilterPredicate(table, bloom_filter, 2);
}
// IS NOT NULL predicate
ASSERT_EQ(CountRows(table, {}) - 1,
CountRows(table, { table->NewIsNotNullPredicate("value") }));
// IS NULL predicate
ASSERT_EQ(1, CountRows(table, { table->NewIsNullPredicate("value") }));
}
TEST_F(PredicateTest, TestInt8Predicates) {
shared_ptr<KuduTable> table = CreateAndOpenTable(KuduColumnSchema::INT8);
shared_ptr<KuduSession> session = CreateSession();
int i = 0;
for (int8_t value : CreateIntValues<int8_t>()) {
unique_ptr<KuduInsert> insert(table->NewInsert());
ASSERT_OK(insert->mutable_row()->SetInt64("key", i++));
ASSERT_OK(insert->mutable_row()->SetInt8("value", value));
ASSERT_OK(session->Apply(insert.release()));
}
unique_ptr<KuduInsert> null_insert(table->NewInsert());
ASSERT_OK(null_insert->mutable_row()->SetInt64("key", i++));
ASSERT_OK(null_insert->mutable_row()->SetNull("value"));
ASSERT_OK(session->Apply(null_insert.release()));
ASSERT_OK(session->Flush());
CheckIntPredicates<int8_t>(table);
}
TEST_F(PredicateTest, TestInt16Predicates) {
shared_ptr<KuduTable> table = CreateAndOpenTable(KuduColumnSchema::INT16);
shared_ptr<KuduSession> session = CreateSession();
int i = 0;
for (int16_t value : CreateIntValues<int16_t>()) {
unique_ptr<KuduInsert> insert(table->NewInsert());
ASSERT_OK(insert->mutable_row()->SetInt64("key", i++));
ASSERT_OK(insert->mutable_row()->SetInt16("value", value));
ASSERT_OK(session->Apply(insert.release()));
}
unique_ptr<KuduInsert> null_insert(table->NewInsert());
ASSERT_OK(null_insert->mutable_row()->SetInt64("key", i++));
ASSERT_OK(null_insert->mutable_row()->SetNull("value"));
ASSERT_OK(session->Apply(null_insert.release()));
ASSERT_OK(session->Flush());
CheckIntPredicates<int16_t>(table);
}
TEST_F(PredicateTest, TestInt32Predicates) {
shared_ptr<KuduTable> table = CreateAndOpenTable(KuduColumnSchema::INT32);
shared_ptr<KuduSession> session = CreateSession();
int i = 0;
for (int32_t value : CreateIntValues<int32_t>()) {
unique_ptr<KuduInsert> insert(table->NewInsert());
ASSERT_OK(insert->mutable_row()->SetInt64("key", i++));
ASSERT_OK(insert->mutable_row()->SetInt32("value", value));
ASSERT_OK(session->Apply(insert.release()));
}
unique_ptr<KuduInsert> null_insert(table->NewInsert());
ASSERT_OK(null_insert->mutable_row()->SetInt64("key", i++));
ASSERT_OK(null_insert->mutable_row()->SetNull("value"));
ASSERT_OK(session->Apply(null_insert.release()));
ASSERT_OK(session->Flush());
CheckIntPredicates<int32_t>(table);
}
TEST_F(PredicateTest, TestInt64Predicates) {
shared_ptr<KuduTable> table = CreateAndOpenTable(KuduColumnSchema::INT64);
shared_ptr<KuduSession> session = CreateSession();
int i = 0;
for (int64_t value : CreateIntValues<int64_t>()) {
unique_ptr<KuduInsert> insert(table->NewInsert());
ASSERT_OK(insert->mutable_row()->SetInt64("key", i++));
ASSERT_OK(insert->mutable_row()->SetInt64("value", value));
ASSERT_OK(session->Apply(insert.release()));
}
unique_ptr<KuduInsert> null_insert(table->NewInsert());
ASSERT_OK(null_insert->mutable_row()->SetInt64("key", i++));
ASSERT_OK(null_insert->mutable_row()->SetNull("value"));
ASSERT_OK(session->Apply(null_insert.release()));
ASSERT_OK(session->Flush());
CheckIntPredicates<int64_t>(table);
}
TEST_F(PredicateTest, TestTimestampPredicates) {
shared_ptr<KuduTable> table = CreateAndOpenTable(KuduColumnSchema::UNIXTIME_MICROS);
shared_ptr<KuduSession> session = CreateSession();
int i = 0;
for (int64_t value : CreateIntValues<int64_t>()) {
unique_ptr<KuduInsert> insert(table->NewInsert());
ASSERT_OK(insert->mutable_row()->SetInt64("key", i++));
ASSERT_OK(insert->mutable_row()->SetUnixTimeMicros("value", value));
ASSERT_OK(session->Apply(insert.release()));
}
unique_ptr<KuduInsert> null_insert(table->NewInsert());
ASSERT_OK(null_insert->mutable_row()->SetInt64("key", i++));
ASSERT_OK(null_insert->mutable_row()->SetNull("value"));
ASSERT_OK(session->Apply(null_insert.release()));
ASSERT_OK(session->Flush());
CheckIntPredicates<int64_t>(table);
}
TEST_F(PredicateTest, TestDatePredicates) {
shared_ptr<KuduTable> table = CreateAndOpenTable(KuduColumnSchema::DATE);
shared_ptr<KuduSession> session = CreateSession();
int i = 0;
for (int32_t value : CreateIntValues<int32_t>()) {
unique_ptr<KuduInsert> insert(table->NewInsert());
ASSERT_OK(insert->mutable_row()->SetInt64("key", i++));
ASSERT_OK(insert->mutable_row()->SetDate("value", value));
ASSERT_OK(session->Apply(insert.release()));
}
unique_ptr<KuduInsert> null_insert(table->NewInsert());
ASSERT_OK(null_insert->mutable_row()->SetInt64("key", i++));
ASSERT_OK(null_insert->mutable_row()->SetNull("value"));
ASSERT_OK(session->Apply(null_insert.release()));
ASSERT_OK(session->Flush());
CheckIntPredicates<int32_t>(table);
}
TEST_F(PredicateTest, TestFloatPredicates) {
shared_ptr<KuduTable> table = CreateAndOpenTable(KuduColumnSchema::FLOAT);
shared_ptr<KuduSession> session = CreateSession();
vector<float> values = CreateFloatingPointValues<float>();
vector<float> test_values = CreateFloatingPointTestValues<float>();
int i = 0;
for (float value : values) {
unique_ptr<KuduInsert> insert(table->NewInsert());
ASSERT_OK(insert->mutable_row()->SetInt64("key", i++));
ASSERT_OK(insert->mutable_row()->SetFloat("value", value));
ASSERT_OK(session->Apply(insert.release()));
}
unique_ptr<KuduInsert> null_insert(table->NewInsert());
ASSERT_OK(null_insert->mutable_row()->SetInt64("key", i++));
ASSERT_OK(null_insert->mutable_row()->SetNull("value"));
ASSERT_OK(session->Apply(null_insert.release()));
ASSERT_OK(session->Flush());
ASSERT_EQ(values.size() + 1, CountRows(table, {}));
for (float v : test_values) {
SCOPED_TRACE(Substitute("test value: $0", v));
{ // value = v
int count = count_if(values.begin(), values.end(),
[&] (float value) { return value == v; });
ASSERT_EQ(count, CountRows(table, {
table->NewComparisonPredicate("value", KuduPredicate::EQUAL, KuduValue::FromFloat(v)),
}));
}
{ // value >= v
int count = count_if(values.begin(), values.end(),
[&] (float value) { return value >= v; });
ASSERT_EQ(count, CountRows(table, {
table->NewComparisonPredicate("value",
KuduPredicate::GREATER_EQUAL,
KuduValue::FromFloat(v)),
}));
}
{ // value <= v
int count = count_if(values.begin(), values.end(),
[&] (float value) { return value <= v; });
ASSERT_EQ(count, CountRows(table, {
table->NewComparisonPredicate("value",
KuduPredicate::LESS_EQUAL,
KuduValue::FromFloat(v)),
}));
}
{ // value > v
int count = count_if(values.begin(), values.end(),
[&] (float value) { return value > v; });
ASSERT_EQ(count, CountRows(table, {
table->NewComparisonPredicate("value",
KuduPredicate::GREATER,
KuduValue::FromFloat(v)),
}));
}
{ // value < v
int count = count_if(values.begin(), values.end(),
[&] (float value) { return value < v; });
ASSERT_EQ(count, CountRows(table, {
table->NewComparisonPredicate("value",
KuduPredicate::LESS,
KuduValue::FromFloat(v)),
}));
}
{ // value >= 0
// value <= v
int count = count_if(values.begin(), values.end(),
[&] (float value) { return value >= 0.0 && value <= v; });
ASSERT_EQ(count, CountRows(table, {
table->NewComparisonPredicate("value",
KuduPredicate::GREATER_EQUAL,
KuduValue::FromFloat(0.0)),
table->NewComparisonPredicate("value",
KuduPredicate::LESS_EQUAL,
KuduValue::FromFloat(v)),
}));
}
{ // value >= v
// value <= 0.0
int count = count_if(values.begin(), values.end(),
[&] (float value) { return value >= v && value <= 0.0; });
ASSERT_EQ(count, CountRows(table, {
table->NewComparisonPredicate("value",
KuduPredicate::GREATER_EQUAL,
KuduValue::FromFloat(v)),
table->NewComparisonPredicate("value",
KuduPredicate::LESS_EQUAL,
KuduValue::FromFloat(0.0)),
}));
}
}
// IN list and IN Bloom filter predicates
std::mt19937 gen(SeedRandom());
std::shuffle(test_values.begin(), test_values.end(), gen);
for (auto end = test_values.begin(); end <= test_values.end(); end++) {
vector<KuduValue*> vals;
auto* bloom_filter = CreateBloomFilter(std::distance(test_values.begin(), end));
for (auto itr = test_values.begin(); itr != end; itr++) {
vals.push_back(KuduValue::FromFloat(*itr));
auto key = *itr;
bloom_filter->Insert(Slice(reinterpret_cast<const uint8*>(&key), sizeof(key)));
}
int count = CountMatchedRows(values, vector<float>(test_values.begin(), end));
ASSERT_EQ(count, CountRows(table, { table->NewInListPredicate("value", &vals) }));
int count_with_sign_bit = CountMatchedFloatingPointRowsWithSignBit(
values, vector<float>(test_values.begin(), end));
CheckInBloomFilterPredicate(table, bloom_filter, count_with_sign_bit);
}
// IS NOT NULL predicate
ASSERT_EQ(values.size(),
CountRows(table, { table->NewIsNotNullPredicate("value") }));
// IS NULL predicate
ASSERT_EQ(1, CountRows(table, { table->NewIsNullPredicate("value") }));
}
TEST_F(PredicateTest, TestDoublePredicates) {
shared_ptr<KuduTable> table = CreateAndOpenTable(KuduColumnSchema::DOUBLE);
shared_ptr<KuduSession> session = CreateSession();
vector<double> values = CreateFloatingPointValues<double>();
vector<double> test_values = CreateFloatingPointTestValues<double>();
int i = 0;
for (double value : values) {
unique_ptr<KuduInsert> insert(table->NewInsert());
ASSERT_OK(insert->mutable_row()->SetInt64("key", i++));
ASSERT_OK(insert->mutable_row()->SetDouble("value", value));
ASSERT_OK(session->Apply(insert.release()));
}
unique_ptr<KuduInsert> null_insert(table->NewInsert());
ASSERT_OK(null_insert->mutable_row()->SetInt64("key", i++));
ASSERT_OK(null_insert->mutable_row()->SetNull("value"));
ASSERT_OK(session->Apply(null_insert.release()));
ASSERT_OK(session->Flush());
ASSERT_EQ(values.size() + 1, CountRows(table, {}));
for (double v : test_values) {
SCOPED_TRACE(Substitute("test value: $0", v));
{ // value = v
int count = count_if(values.begin(), values.end(),
[&] (double value) { return value == v; });
ASSERT_EQ(count, CountRows(table, {
table->NewComparisonPredicate("value", KuduPredicate::EQUAL, KuduValue::FromDouble(v)),
}));
}
{ // value >= v
int count = count_if(values.begin(), values.end(),
[&] (double value) { return value >= v; });
ASSERT_EQ(count, CountRows(table, {
table->NewComparisonPredicate("value",
KuduPredicate::GREATER_EQUAL,
KuduValue::FromDouble(v)),
}));
}
{ // value <= v
int count = count_if(values.begin(), values.end(),
[&] (double value) { return value <= v; });
ASSERT_EQ(count, CountRows(table, {
table->NewComparisonPredicate("value",
KuduPredicate::LESS_EQUAL,
KuduValue::FromDouble(v)),
}));
}
{ // value > v
int count = count_if(values.begin(), values.end(),
[&] (double value) { return value > v; });
ASSERT_EQ(count, CountRows(table, {
table->NewComparisonPredicate("value",
KuduPredicate::GREATER,
KuduValue::FromDouble(v)),
}));
}
{ // value < v
int count = count_if(values.begin(), values.end(),
[&] (double value) { return value < v; });
ASSERT_EQ(count, CountRows(table, {
table->NewComparisonPredicate("value",
KuduPredicate::LESS,
KuduValue::FromDouble(v)),
}));
}
{ // value >= 0.0
// value <= v
int count = count_if(values.begin(), values.end(),
[&] (double value) { return value >= 0.0 && value <= v; });
ASSERT_EQ(count, CountRows(table, {
table->NewComparisonPredicate("value",
KuduPredicate::GREATER_EQUAL,
KuduValue::FromDouble(0.0)),
table->NewComparisonPredicate("value",
KuduPredicate::LESS_EQUAL,
KuduValue::FromDouble(v)),
}));
}
{ // value >= v
// value <= 0.0
int count = count_if(values.begin(), values.end(),
[&] (double value) { return value >= v && value <= 0.0; });
ASSERT_EQ(count, CountRows(table, {
table->NewComparisonPredicate("value",
KuduPredicate::GREATER_EQUAL,
KuduValue::FromDouble(v)),
table->NewComparisonPredicate("value",
KuduPredicate::LESS_EQUAL,
KuduValue::FromDouble(0.0)),
}));
}
}
// IN list and IN Bloom filter predicates
std::mt19937 gen(SeedRandom());
std::shuffle(test_values.begin(), test_values.end(), gen);
for (auto end = test_values.begin(); end <= test_values.end(); end++) {
vector<KuduValue*> vals;
auto* bloom_filter = CreateBloomFilter(std::distance(test_values.begin(), end));
for (auto itr = test_values.begin(); itr != end; itr++) {
vals.push_back(KuduValue::FromDouble(*itr));
auto key = *itr;
bloom_filter->Insert(Slice(reinterpret_cast<const uint8_t*>(&key), sizeof(key)));
}
int count = CountMatchedRows(values, vector<double>(test_values.begin(), end));
ASSERT_EQ(count, CountRows(table, { table->NewInListPredicate("value", &vals) }));
int count_with_sign_bit = CountMatchedFloatingPointRowsWithSignBit(
values, vector<double>(test_values.begin(), end));
CheckInBloomFilterPredicate(table, bloom_filter, count_with_sign_bit);
}
// IS NOT NULL predicate
ASSERT_EQ(values.size(),
CountRows(table, { table->NewIsNotNullPredicate("value") }));
// IS NULL predicate
ASSERT_EQ(1, CountRows(table, { table->NewIsNullPredicate("value") }));
}
TEST_F(PredicateTest, TestDecimalPredicates) {
KuduSchema schema;
{
KuduSchemaBuilder builder;
builder.AddColumn("key")->NotNull()->Type(KuduColumnSchema::INT64)->PrimaryKey();
builder.AddColumn("value")->Type(KuduColumnSchema::DECIMAL)
->Precision(kMaxDecimal128Precision)->Scale(2);
CHECK_OK(builder.Build(&schema));
}
unique_ptr<client::KuduTableCreator> table_creator(client_->NewTableCreator());
CHECK_OK(table_creator->table_name("table")
.schema(&schema)
.set_range_partition_columns({ "key" })
.num_replicas(1)
.Create());
shared_ptr<KuduTable> table;
CHECK_OK(client_->OpenTable("table", &table));
shared_ptr<KuduSession> session = CreateSession();
vector<int128_t> values = CreateDecimalValues();
vector<int128_t> test_values = CreateDecimalTestValues();
int i = 0;
for (int128_t value : values) {
unique_ptr<KuduInsert> insert(table->NewInsert());
ASSERT_OK(insert->mutable_row()->SetInt64("key", i++));
ASSERT_OK(insert->mutable_row()->SetUnscaledDecimal("value", value));
ASSERT_OK(session->Apply(insert.release()));
}
unique_ptr<KuduInsert> null_insert(table->NewInsert());
ASSERT_OK(null_insert->mutable_row()->SetInt64("key", i++));
ASSERT_OK(null_insert->mutable_row()->SetNull("value"));
ASSERT_OK(session->Apply(null_insert.release()));
ASSERT_OK(session->Flush());
ASSERT_EQ(values.size() + 1, CountRows(table, {}));
for (int128_t v : test_values) {
SCOPED_TRACE(Substitute("test value: $0", v));
{ // value = v
int count = count_if(values.begin(), values.end(),
[&] (int128_t value) { return value == v; });
ASSERT_EQ(count, CountRows(table, {
table->NewComparisonPredicate("value", KuduPredicate::EQUAL,
KuduValue::FromDecimal(v, 2)),
}));
}
{ // value >= v
int count = count_if(values.begin(), values.end(),
[&] (int128_t value) { return value >= v; });
ASSERT_EQ(count, CountRows(table, {
table->NewComparisonPredicate("value",
KuduPredicate::GREATER_EQUAL,
KuduValue::FromDecimal(v, 2)),
}));
}
{ // value <= v
int count = count_if(values.begin(), values.end(),
[&] (int128_t value) { return value <= v; });
ASSERT_EQ(count, CountRows(table, {
table->NewComparisonPredicate("value",
KuduPredicate::LESS_EQUAL,
KuduValue::FromDecimal(v, 2)),
}));
}
{ // value > v
int count = count_if(values.begin(), values.end(),
[&] (int128_t value) { return value > v; });
ASSERT_EQ(count, CountRows(table, {
table->NewComparisonPredicate("value",
KuduPredicate::GREATER,
KuduValue::FromDecimal(v, 2)),
}));
}
{ // value < v
int count = count_if(values.begin(), values.end(),
[&] (int128_t value) { return value < v; });
ASSERT_EQ(count, CountRows(table, {
table->NewComparisonPredicate("value",
KuduPredicate::LESS,
KuduValue::FromDecimal(v, 2)),
}));
}
{ // value >= 0
// value <= v
int count = count_if(values.begin(), values.end(),
[&] (int128_t value) { return value >= 0 && value <= v; });
ASSERT_EQ(count, CountRows(table, {
table->NewComparisonPredicate("value",
KuduPredicate::GREATER_EQUAL,
KuduValue::FromDecimal(0, 2)),
table->NewComparisonPredicate("value",
KuduPredicate::LESS_EQUAL,
KuduValue::FromDecimal(v, 2)),
}));
}
{ // value >= v
// value <= 0.0
int count = count_if(values.begin(), values.end(),
[&] (int128_t value) { return value >= v && value <= 0; });
ASSERT_EQ(count, CountRows(table, {
table->NewComparisonPredicate("value",
KuduPredicate::GREATER_EQUAL,
KuduValue::FromDecimal(v, 2)),
table->NewComparisonPredicate("value",
KuduPredicate::LESS_EQUAL,
KuduValue::FromDecimal(0, 2)),
}));
}
}
// IN list and IN Bloom filter predicates
std::mt19937 gen(SeedRandom());
std::shuffle(test_values.begin(), test_values.end(), gen);
for (auto end = test_values.begin(); end <= test_values.end(); end++) {
vector<KuduValue*> vals;
auto* bloom_filter = CreateBloomFilter(std::distance(test_values.begin(), end));
for (auto itr = test_values.begin(); itr != end; itr++) {
vals.push_back(KuduValue::FromDecimal(*itr, 2));
auto key = *itr;
bloom_filter->Insert(Slice(reinterpret_cast<const uint8_t*>(&key), sizeof(key)));
}
int count = CountMatchedRows<int128_t>(values, vector<int128_t>(test_values.begin(), end));
ASSERT_EQ(count, CountRows(table, { table->NewInListPredicate("value", &vals) }));
CheckInBloomFilterPredicate(table, bloom_filter, count);
}
// IS NOT NULL predicate
ASSERT_EQ(values.size(),
CountRows(table, { table->NewIsNotNullPredicate("value") }));
// IS NULL predicate
ASSERT_EQ(1, CountRows(table, { table->NewIsNullPredicate("value") }));
}
class BloomFilterPredicateTest : public PredicateTest {
public:
BloomFilterPredicateTest() : rand_(Random(SeedRandom())) {}
protected:
shared_ptr<KuduTable> table_;
shared_ptr<KuduSession> session_;
Random rand_;
unordered_set<int32_t> all_values_;
int32_t min_value_, max_value_;
// Subset of "all_values_".
vector<int32_t> included_values_;
// Values that are not contained in "all_values_".
unordered_set<int32_t> excluded_values_;
// Number of false positives based on the number of values that'll be searched against.
int num_false_positive_values_;
// Initialize the members with the values that will be inserted to the table and Bloom filters.
// The table will have 'num_all_values' unique values, and we'll generate two Bloom filters:
// one with 'num_included_values' values from the table, and one with 'num_excluded_values'
// values that aren't present in the table.
void Init(int num_all_values, int num_included_values, int num_excluded_values) {
ASSERT_LE(num_included_values, num_all_values);
table_ = CreateAndOpenTable(KuduColumnSchema::INT32);
session_ = CreateSession();
const unordered_set<int32_t> empty_set;
all_values_ = CreateRandomUniqueIntegers<int32_t>(num_all_values, empty_set, &rand_);
auto min_max_pair = std::minmax_element(all_values_.begin(), all_values_.end());
min_value_ = *min_max_pair.first;
max_value_ = *min_max_pair.second;
ReservoirSample(all_values_, num_included_values, empty_set, &rand_, &included_values_);
excluded_values_ = CreateRandomUniqueIntegers<int32_t>(num_excluded_values, all_values_,
&rand_);
// NOLINTNEXTLINE(bugprone-narrowing-conversions)
num_false_positive_values_ = num_all_values * kBloomFilterFalsePositiveProb;
}
void InsertAllValuesInTable() {
int i = 0;
for (auto value : all_values_) {
unique_ptr<KuduInsert> insert(table_->NewInsert());
ASSERT_OK(insert->mutable_row()->SetInt64("key", i++));
ASSERT_OK(insert->mutable_row()->SetInt32("value", value));
ASSERT_OK(session_->Apply(insert.release()));
}
ASSERT_OK(session_->Flush());
}
// Combine supplied Bloom filter predicates that contains included values
// with Range predicate.
void TestWithRangePredicate(KuduPredicate* included_predicate1,
KuduPredicate* included_predicate2) {
auto* less_predicate = table_->NewComparisonPredicate("value", KuduPredicate::LESS,
KuduValue::FromInt(min_value_));
int actual_count_less;
LOG_TIMING(INFO, Substitute("$0: Counting rows with a range predicate less than the min value",
CURRENT_TEST_NAME())) {
actual_count_less = CountRows(table_, {included_predicate1, less_predicate});
}
EXPECT_EQ(0, actual_count_less);
auto* ge_predicate = table_->NewComparisonPredicate("value", KuduPredicate::GREATER_EQUAL,
KuduValue::FromInt(min_value_));
auto* le_predicate = table_->NewComparisonPredicate("value", KuduPredicate::LESS_EQUAL,
KuduValue::FromInt(max_value_));
int actual_count_range;
LOG_TIMING(INFO, Substitute("$0: Counting rows with a range predicate that includes all values",
CURRENT_TEST_NAME())) {
actual_count_range = CountRows(table_,
{included_predicate2, ge_predicate, le_predicate});
}
EXPECT_LE(included_values_.size(), actual_count_range);
EXPECT_GE(included_values_.size() + num_false_positive_values_, actual_count_range);
}
};
TEST_F(BloomFilterPredicateTest, TestKuduBloomFilterPredicate) {
Init(100000/*num_all_values*/, 10000/*num_included_values*/, 10000/*num_excluded_values*/);
KuduBloomFilter* included_bf = CreateBloomFilterWithValues(included_values_);
KuduBloomFilter* excluded_bf = CreateBloomFilterWithValues(excluded_values_);
InsertAllValuesInTable();
vector<KuduBloomFilter*> included_bf_vec = { included_bf };
auto* included_predicate =
table_->NewInBloomFilterPredicate("value", &included_bf_vec);
auto* included_predicate_clone1 = included_predicate->Clone();
auto* included_predicate_clone2 = included_predicate->Clone();
ASSERT_TRUE(included_bf_vec.empty());
int actual_count_included = CountRows(table_, { included_predicate });
EXPECT_LE(included_values_.size(), actual_count_included);
EXPECT_GE(included_values_.size() + num_false_positive_values_, actual_count_included);
vector<KuduBloomFilter*> excluded_bf_vec = { excluded_bf };
auto* excluded_predicate =
table_->NewInBloomFilterPredicate("value", &excluded_bf_vec);
ASSERT_TRUE(excluded_bf_vec.empty());
int actual_count_excluded = CountRows(table_, { excluded_predicate });
EXPECT_LE(0, actual_count_excluded);
EXPECT_GE(num_false_positive_values_, actual_count_excluded);
// Combine Range predicate with Bloom filter predicate.
TestWithRangePredicate(included_predicate_clone1, included_predicate_clone2);
}
// Same as TestKuduBloomFilterPredicate above but using the overloaded NewInBloomFilterPredicate()
// client API with direct BlockBloomFilter pointer.
TEST_F(BloomFilterPredicateTest, TestDirectBlockBloomFilterPredicate) {
Init(100000/*num_all_values*/, 10000/*num_included_values*/, 10000/*num_excluded_values*/);
unique_ptr<BlockBloomFilter> included_bf = CreateDirectBloomFilterWithValues(included_values_);
unique_ptr<BlockBloomFilter> excluded_bf = CreateDirectBloomFilterWithValues(excluded_values_);
InsertAllValuesInTable();
vector<Slice> included_bf_vec =
{ Slice(reinterpret_cast<const uint8_t*>(included_bf.get()), sizeof(*included_bf)) };
const size_t included_bf_vec_size = included_bf_vec.size();
auto* included_predicate =
table_->NewInBloomFilterPredicate("value", included_bf_vec);
auto* included_predicate_clone1 = included_predicate->Clone();
auto* included_predicate_clone2 = included_predicate->Clone();
ASSERT_EQ(included_bf_vec_size, included_bf_vec.size());
int actual_count_included = CountRows(table_, { included_predicate });
EXPECT_LE(included_values_.size(), actual_count_included);
EXPECT_GE(included_values_.size() + num_false_positive_values_, actual_count_included);
vector<Slice> excluded_bf_vec =
{ Slice(reinterpret_cast<const uint8_t*>(excluded_bf.get()), sizeof(*excluded_bf)) };
const size_t excluded_bf_vec_size = excluded_bf_vec.size();
auto* excluded_predicate =
table_->NewInBloomFilterPredicate("value", excluded_bf_vec);
ASSERT_EQ(excluded_bf_vec_size, excluded_bf_vec.size());
int actual_count_excluded = CountRows(table_, { excluded_predicate });
EXPECT_LE(0, actual_count_excluded);
EXPECT_GE(num_false_positive_values_, actual_count_excluded);
// Combine Range predicate with Bloom filter predicate.
TestWithRangePredicate(included_predicate_clone1, included_predicate_clone2);
}
// Test to verify that an ineffective Bloom filter predicate will be disabled.
TEST_F(BloomFilterPredicateTest, TestDisabledBloomFilterPredicate) {
// Insert all values from the table in Bloom filter so that the predicate
// is determined to be ineffective.
Init(10000/*num_all_values*/, 10000/*num_included_values*/, 0/*num_excluded_values*/);
KuduBloomFilter* included_bf = CreateBloomFilterWithValues(included_values_);
InsertAllValuesInTable();
vector<KuduBloomFilter*> included_bf_vec = { included_bf };
auto* included_predicate =
table_->NewInBloomFilterPredicate("value", &included_bf_vec);
ASSERT_TRUE(included_bf_vec.empty());
FLAGS_predicate_effectivess_num_skip_blocks = 4;
int actual_count_included = CountRows(table_, { included_predicate });
ASSERT_EQ(included_values_.size(), actual_count_included);
// CountRows() runs 2 scans using a cloned predicate.
CheckDisabledPredicates(GetTheOnlyTabletId(), 2 /* expected_disabled_predicates */);
}
// Benchmark test that combines Bloom filter predicate with range predicate.
TEST_F(BloomFilterPredicateTest, TestKuduBloomFilterPredicateBenchmark) {
SKIP_IF_SLOW_NOT_ALLOWED();
// Writing large number of rows sometimes leads to write timeouts. Hence the test below
// uses 1M rows instead.
Init(1000000/*num_all_values*/, 10000/*num_included_values*/, 10000/*num_excluded_values*/);
KuduBloomFilter* included_bf = CreateBloomFilterWithValues(included_values_);
InsertAllValuesInTable();
vector<KuduBloomFilter*> included_bf_vec = { included_bf };
auto* included_predicate = table_->NewInBloomFilterPredicate("value", &included_bf_vec);
auto* included_predicate_clone = included_predicate->Clone();
TestWithRangePredicate(included_predicate, included_predicate_clone);
}
class ParameterizedBloomFilterPredicateTest :
public PredicateTest,
public ::testing::WithParamInterface<std::tuple<bool, bool>> {};
INSTANTIATE_TEST_CASE_P(, ParameterizedBloomFilterPredicateTest,
::testing::Combine(::testing::Bool(),
::testing::Bool()));
// Test to verify that an ineffective Bloom filter predicate will be disabled
// using a pattern of repeated strings.
TEST_P(ParameterizedBloomFilterPredicateTest, TestDisabledBloomFilterWithRepeatedStrings) {
// Combination of following 2 flags help test cases with MRSs flushed, DRS and delta files.
const bool flush_tablet = std::get<0>(GetParam());
const bool update_rows = std::get<1>(GetParam());
shared_ptr<KuduTable> table = CreateAndOpenTable(KuduColumnSchema::STRING);
shared_ptr<KuduSession> session = CreateSession();
// Use a set of predetermined strings and populate the table.
// Create a BF predicate with same set of strings.
deque<string> values = {"Alice", "Bob", "Charlie", "Doug", "Elizabeth", "Frank", "George",
"Harry"};
// Populate table with a small set of strings that are repeated.
static constexpr int kNumRows = 10000;
auto upsert_rows = [&]() {
int i = 0;
while (i < kNumRows) {
for (int j = 0; j < values.size() && i < kNumRows; j++, i++) {
const string &value = values[j];
unique_ptr<KuduUpsert> upsert(table->NewUpsert());
ASSERT_OK(upsert->mutable_row()->SetInt64("key", i));
ASSERT_OK(upsert->mutable_row()->SetStringNoCopy("value", value));
ASSERT_OK(session->Apply(upsert.release()));
}
// TSAN builds timeout and fail on flushing with large number of rows.
if ((i % (kNumRows / 10)) == 0) {
ASSERT_OK(session->Flush());
}
}
ASSERT_OK(session->Flush());
};
upsert_rows();
const string tablet_id = GetTheOnlyTabletId();
if (flush_tablet) {
ASSERT_OK(cluster_->FlushTablet(tablet_id));
}
if (update_rows) {
string first_name = values.front();
values.pop_front();
values.push_back(first_name);
upsert_rows();
if (flush_tablet) {
ASSERT_OK(cluster_->FlushTablet(tablet_id));
}
}
// Create Bloom filter predicate with string values.
auto* bf = CreateBloomFilter(values.size());
for (const auto& value : values) {
bf->Insert(Slice(value));
}
vector<KuduBloomFilter*> bf_vec = { bf };
auto* bf_predicate = table->NewInBloomFilterPredicate("value", &bf_vec);
ASSERT_TRUE(bf_vec.empty());
FLAGS_predicate_effectivess_num_skip_blocks = 4;
int actual_count_included = DoCountRows(table, { bf_predicate });
ASSERT_EQ(kNumRows, actual_count_included);
CheckDisabledPredicates(tablet_id, 1 /* expected_disabled_predicates */);
}
class ParameterizedPredicateTest : public PredicateTest,
public ::testing::WithParamInterface<KuduColumnSchema::DataType> {};
INSTANTIATE_TEST_CASE_P(, ParameterizedPredicateTest,
::testing::Values(KuduColumnSchema::STRING,
KuduColumnSchema::BINARY,
KuduColumnSchema::VARCHAR));
TEST_P(ParameterizedPredicateTest, TestIndirectDataPredicates) {
shared_ptr<KuduTable> table = CreateAndOpenTable(GetParam());
shared_ptr<KuduSession> session = CreateSession();
vector<string> values = CreateStringValues();
int i = 0;
for (const string& value : values) {
unique_ptr<KuduInsert> insert(table->NewInsert());
ASSERT_OK(insert->mutable_row()->SetInt64("key", i++));
switch (GetParam()) {
case KuduColumnSchema::STRING:
ASSERT_OK(insert->mutable_row()->SetStringNoCopy("value", value));
break;
case KuduColumnSchema::BINARY:
ASSERT_OK(insert->mutable_row()->SetBinaryNoCopy("value", value));
break;
case KuduColumnSchema::VARCHAR:
ASSERT_OK(insert->mutable_row()->SetVarchar("value", value));
break;
default:
LOG(FATAL) << "Invalid type";
break;
}
ASSERT_OK(session->Apply(insert.release()));
}
unique_ptr<KuduInsert> null_insert(table->NewInsert());
ASSERT_OK(null_insert->mutable_row()->SetInt64("key", i++));
ASSERT_OK(null_insert->mutable_row()->SetNull("value"));
ASSERT_OK(session->Apply(null_insert.release()));
ASSERT_OK(session->Flush());
CheckStringPredicates(table, values);
}
} // namespace client
} // namespace kudu