// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include <algorithm>
#include <cmath>
#include <cstddef>
#include <cstdint>
#include <deque>
#include <initializer_list>
#include <iterator>
#include <limits>
#include <memory>
#include <ostream>
#include <random>
#include <string>
#include <tuple>
#include <type_traits>
#include <unordered_set>
#include <utility>
#include <vector>

#include <gflags/gflags_declare.h>
#include <glog/logging.h>
#include <gtest/gtest.h>

#include "kudu/client/client.h"
#include "kudu/client/scan_batch.h"
#include "kudu/client/scan_predicate.h"
#include "kudu/client/schema.h"
#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
#include "kudu/client/value.h"
#include "kudu/client/write_op.h"
#include "kudu/common/partial_row.h"
#include "kudu/gutil/integral_types.h"
#include "kudu/gutil/strings/escaping.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/integration-tests/cluster_itest_util.h"
#include "kudu/mini-cluster/internal_mini_cluster.h"
#include "kudu/tserver/mini_tablet_server.h"
#include "kudu/util/block_bloom_filter.h"
#include "kudu/util/decimal_util.h"
#include "kudu/util/hash.pb.h"
#include "kudu/util/int128.h"
#include "kudu/util/metrics.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/random.h"
#include "kudu/util/random_util.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"

DECLARE_int32(predicate_effectivess_num_skip_blocks);
METRIC_DECLARE_counter(scanner_predicates_disabled);
METRIC_DECLARE_entity(tablet);

using std::count_if;
using std::deque;
using std::numeric_limits;
using std::string;
using std::unique_ptr;
using std::unordered_set;
using std::vector;
using strings::Substitute;

namespace kudu {
namespace client {

using cluster::InternalMiniCluster;
using cluster::InternalMiniClusterOptions;
using sp::shared_ptr;

class PredicateTest : public KuduTest {

 protected:
  static constexpr float kBloomFilterFalsePositiveProb = 0.01;

  void SetUp() override {
    // Set up the mini cluster
    cluster_.reset(new InternalMiniCluster(env_, InternalMiniClusterOptions()));
    ASSERT_OK(cluster_->Start());
    ASSERT_OK(cluster_->CreateClient(nullptr, &client_));
  }

  // Creates a key/value table schema with an int64 key and value of the
  // specified type.
  shared_ptr<KuduTable> CreateAndOpenTable(KuduColumnSchema::DataType value_type) {
    KuduSchema schema;
    {
      KuduSchemaBuilder builder;
      builder.AddColumn("key")->NotNull()->Type(KuduColumnSchema::INT64)->PrimaryKey();
      switch (value_type) {
        case KuduColumnSchema::VARCHAR:
          builder.AddColumn("value")->Type(value_type)
            ->Length(40);
          break;
        case KuduColumnSchema::DECIMAL:
          builder.AddColumn("value")->Type(value_type)
            ->Precision(kMaxDecimal128Precision)->Scale(2);
          break;
        default:
          builder.AddColumn("value")->Type(value_type);
          break;
      }
      CHECK_OK(builder.Build(&schema));
    }
    unique_ptr<client::KuduTableCreator> table_creator(client_->NewTableCreator());
    CHECK_OK(table_creator->table_name("table")
        .schema(&schema)
        .set_range_partition_columns({ "key" })
        .num_replicas(1)
        .Create());

    shared_ptr<KuduTable> table;
    CHECK_OK(client_->OpenTable("table", &table));
    return table;
  }

  // Creates a new session in automatic background flush mode.
  shared_ptr<KuduSession> CreateSession() {
    shared_ptr<KuduSession> session = client_->NewSession();
    session->SetTimeoutMillis(10000);
    CHECK_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND));
    return session;
  }

  // Count the rows in a table which satisfy the specified predicates.
  int DoCountRows(const shared_ptr<KuduTable>& table,
                  const vector<KuduPredicate*>& predicates) {
    KuduScanner scanner(table.get());
    for (KuduPredicate* predicate : predicates) {
      CHECK_OK(scanner.AddConjunctPredicate(predicate));
    }
    CHECK_OK(scanner.Open());

    int rows = 0;
    while (scanner.HasMoreRows()) {
      KuduScanBatch batch;
      CHECK_OK(scanner.NextBatch(&batch));
      rows += batch.NumRows();
    }
    return rows;
  }

  // Count the rows in a table which satisfy the specified predicates. This
  // also does a separate scan with cloned predicates, in order to test that
  // cloned predicates behave exactly as the original.
  int CountRows(const shared_ptr<KuduTable>& table,
                const vector<KuduPredicate*>& predicates) {

    vector<KuduPredicate*> cloned_predicates;
    cloned_predicates.reserve(predicates.size());
    for (KuduPredicate* pred : predicates) {
      cloned_predicates.push_back(pred->Clone());
    }

    int cloned_count = DoCountRows(table, cloned_predicates);
    int count = DoCountRows(table, predicates);
    CHECK_EQ(count, cloned_count);
    return count;
  }

  template <typename T>
  int CountMatchedRows(const vector<T>& values, const vector<T>& test_values) {

    int count = 0;
    for (const T& v : values) {
      if (std::any_of(test_values.begin(), test_values.end(),
                      [&] (const T& t) { return t == v; })) {
        count++;
      }
    }
    return count;
  }

  // This function helps distinguish floating point values like -0.0 against 0.0.
  template<typename F>
  int CountMatchedFloatingPointRowsWithSignBit(const vector<F>& values,
                                               const vector<F>& test_values) {
    static_assert(std::is_floating_point<F>::value,
                  "This function must only be used with floating point data types");
    int count = 0;
    for (const F& v : values) {
      if (std::any_of(test_values.begin(), test_values.end(),
                      [&] (const F& f) { return f == v && std::signbit(f) == std::signbit(v); })) {
        count++;
      }
    }
    return count;
  }


  // Returns a vector of ints from -50 (inclusive) to 50 (exclusive), and
  // boundary values.
  template <typename T>
  vector<T> CreateIntValues() {
    vector<T> values;
    for (int i = -50; i < 50; i++) {
      values.push_back(i);
    }
    values.push_back(numeric_limits<T>::min());
    values.push_back(numeric_limits<T>::min() + 1);
    values.push_back(numeric_limits<T>::max() - 1);
    values.push_back(numeric_limits<T>::max());
    return values;
  }

  // Returns a vector of ints for testing as predicate boundaries.
  template <typename T>
  vector<T> CreateIntTestValues() {
    return {
      numeric_limits<T>::min(),
      numeric_limits<T>::min() + 1,
      -51,
      -50,
      0,
      49,
      50,
      numeric_limits<T>::max() - 1,
      numeric_limits<T>::max(),
    };
  }

  // Returns a vector of floating point numbers from -50.50 (inclusive) to 49.49
  // (exclusive) (100 values), plus min, max, two normals around 0, two
  // subnormals around 0, positive and negative infinity, and NaN.
  template <typename T>
  vector<T> CreateFloatingPointValues() {
    vector<T> values;
    for (int i = -50; i < 50; i++) {
      values.push_back(static_cast<T>(i) + static_cast<T>(i) / 100);
    }

    // Add special values (listed in ascending order)
    values.push_back(-numeric_limits<T>::infinity());
    values.push_back(numeric_limits<T>::lowest());
    values.push_back(-numeric_limits<T>::min());
    values.push_back(-numeric_limits<T>::denorm_min());
    values.push_back(-0.0);
    values.push_back(numeric_limits<T>::denorm_min());
    values.push_back(numeric_limits<T>::min());
    values.push_back(numeric_limits<T>::max());
    values.push_back(numeric_limits<T>::infinity());

    // Add special NaN value
    // TODO: uncomment after fixing KUDU-1386
    // values.push_back(numeric_limits<T>::quiet_NaN());

    return values;
  }

  /// Returns a vector of floating point numbers for creating test predicates.
  template <typename T>
  vector<T> CreateFloatingPointTestValues() {
    return {
      -numeric_limits<T>::infinity(),
      numeric_limits<T>::lowest(),
      -100.0,
      -1.1,
      -1.0,
      -numeric_limits<T>::min(),
      -numeric_limits<T>::denorm_min(),
      -0.0,
      0.0,
      numeric_limits<T>::denorm_min(),
      numeric_limits<T>::min(),
      1.0,
      1.1,
      100.0,
      numeric_limits<T>::max(),
      numeric_limits<T>::infinity(),

      // TODO: uncomment after fixing KUDU-1386
      // numeric_limits<T>::quiet_NaN();
    };
  }

  // Returns a vector of decimal(4, 2) numbers from -50.50 (inclusive) to 50.50
  // (exclusive) (100 values) and boundary values.
  vector<int128_t> CreateDecimalValues() {
    vector<int128_t> values;
    for (int i = -50; i < 50; i++) {
      values.push_back(i * 100 + i);
    }

    values.push_back(-9999);
    values.push_back(-9998);
    values.push_back(9998);
    values.push_back(9999);

    return values;
  }

  /// Returns a vector of decimal numbers for creating test predicates.
  vector<int128_t> CreateDecimalTestValues() {
    return {
        -9999,
        -9998,
        -5100,
        -5000,
        0,
        4900,
        5000,
        9998,
        9999,
    };
  }

  // Returns a vector of string values.
  vector<string> CreateStringValues() {
    return {
      string(),
      string("\0", 1),
      string("\0\0", 2),
      string("a", 1),
      string("a\0", 2),
      string("a\0a", 3),
      string("aa\0", 3),
    };
  }

  static KuduBloomFilter* CreateBloomFilter(
      int nkeys,
      float false_positive_prob = kBloomFilterFalsePositiveProb) {
    KuduBloomFilterBuilder builder(nkeys);
    builder.false_positive_probability(false_positive_prob);
    KuduBloomFilter* bf;
    CHECK_OK(builder.Build(&bf));
    return bf;
  }

  static unique_ptr<BlockBloomFilter> CreateDirectBloomFilter(
      int nkeys,
      float false_positive_prob = kBloomFilterFalsePositiveProb) {
    unique_ptr<BlockBloomFilter> bf(
        new BlockBloomFilter(DefaultBlockBloomFilterBufferAllocator::GetSingleton()));
    CHECK_OK(bf->Init(BlockBloomFilter::MinLogSpace(nkeys, false_positive_prob),
                      kudu::FAST_HASH, 0));
    return bf;
  }

  template<typename BloomFilterType, typename Collection>
  static void InsertValuesInBloomFilter(BloomFilterType* bloom_filter, const Collection& values) {
    for (const auto& v : values) {
      bloom_filter->Insert(Slice(reinterpret_cast<const uint8_t*>(&v), sizeof(v)));
    }
  }

  template<class Collection>
  static KuduBloomFilter* CreateBloomFilterWithValues(const Collection& values) {
    KuduBloomFilter* bloom_filter = CreateBloomFilter(values.size());
    InsertValuesInBloomFilter(bloom_filter, values);
    return bloom_filter;
  }

  template<class Collection>
  static unique_ptr<BlockBloomFilter> CreateDirectBloomFilterWithValues(const Collection& values) {
    unique_ptr<BlockBloomFilter> bloom_filter = CreateDirectBloomFilter(values.size());
    InsertValuesInBloomFilter(bloom_filter.get(), values);
    return bloom_filter;
  }

  void CheckInBloomFilterPredicate(const shared_ptr<KuduTable>& table,
                                   KuduBloomFilter* in_bloom_filter,
                                   int expected_count) {
    vector<KuduBloomFilter*> bf_vec = { in_bloom_filter };
    auto* bf_predicate = table->NewInBloomFilterPredicate("value", &bf_vec);
    ASSERT_TRUE(bf_vec.empty());
    ASSERT_EQ(expected_count, CountRows(table, { bf_predicate }));
  }

  // Check integer predicates against the specified table. The table must have
  // key/value rows with values from CreateIntValues, plus one null value.
  template <typename T>
  void CheckIntPredicates(const shared_ptr<KuduTable>& table) {
    vector<T> values = CreateIntValues<T>();
    vector<T> test_values = CreateIntTestValues<T>();
    ASSERT_EQ(values.size() + 1, CountRows(table, {}));

    for (T v : test_values) {
      SCOPED_TRACE(Substitute("test value: $0", v));

      { // value = v
        int count = count_if(values.begin(), values.end(),
                             [&] (T value) { return value == v; });
        ASSERT_EQ(count, CountRows(table, {
              table->NewComparisonPredicate("value",
                                            KuduPredicate::EQUAL,
                                            KuduValue::FromInt(v)),
        }));
      }

      { // value >= v
        int count = count_if(values.begin(), values.end(),
                             [&] (T value) { return value >= v; });
        ASSERT_EQ(count, CountRows(table, {
              table->NewComparisonPredicate("value",
                                            KuduPredicate::GREATER_EQUAL,
                                            KuduValue::FromInt(v)),
        }));
      }

      { // value <= v
        int count = count_if(values.begin(), values.end(),
                             [&] (T value) { return value <= v; });
        ASSERT_EQ(count, CountRows(table, {
              table->NewComparisonPredicate("value",
                                            KuduPredicate::LESS_EQUAL,
                                            KuduValue::FromInt(v)),
        }));
      }

      { // value > v
        int count = count_if(values.begin(), values.end(),
                             [&] (T value) { return value > v; });
        ASSERT_EQ(count, CountRows(table, {
              table->NewComparisonPredicate("value",
                                            KuduPredicate::GREATER,
                                            KuduValue::FromInt(v)),
        }));
      }

      { // value < v
        int count = count_if(values.begin(), values.end(),
                             [&] (T value) { return value < v; });
        ASSERT_EQ(count, CountRows(table, {
              table->NewComparisonPredicate("value",
                                            KuduPredicate::LESS,
                                            KuduValue::FromInt(v)),
        }));
      }

      { // value >= 0
        // value <= v
        int count = count_if(values.begin(), values.end(),
                             [&] (T value) { return value >= 0 && value <= v; });
        ASSERT_EQ(count, CountRows(table, {
              table->NewComparisonPredicate("value",
                                            KuduPredicate::GREATER_EQUAL,
                                            KuduValue::FromInt(0)),
              table->NewComparisonPredicate("value",
                                            KuduPredicate::LESS_EQUAL,
                                            KuduValue::FromInt(v)),
        }));
      }

      { // value >= v
        // value <= 0
        int count = count_if(values.begin(), values.end(),
                             [&] (T value) { return value >= v && value <= 0; });
        ASSERT_EQ(count, CountRows(table, {
              table->NewComparisonPredicate("value",
                                            KuduPredicate::GREATER_EQUAL,
                                            KuduValue::FromInt(v)),
              table->NewComparisonPredicate("value",
                                            KuduPredicate::LESS_EQUAL,
                                            KuduValue::FromInt(0)),
        }));
      }
    }

    // IN list and IN Bloom filter predicates
    std::mt19937 gen(SeedRandom());
    std::shuffle(test_values.begin(), test_values.end(), gen);

    for (auto end = test_values.begin(); end <= test_values.end(); end++) {
      vector<KuduValue*> vals;
      auto* bloom_filter = CreateBloomFilter(std::distance(test_values.begin(), end));

      for (auto itr = test_values.begin(); itr != end; itr++) {
        vals.push_back(KuduValue::FromInt(*itr));
        auto key = *itr;
        bloom_filter->Insert(Slice(reinterpret_cast<const uint8_t*>(&key), sizeof(key)));
      }

      int count = CountMatchedRows<T>(values, vector<T>(test_values.begin(), end));
      ASSERT_EQ(count, CountRows(table, { table->NewInListPredicate("value", &vals) }));
      CheckInBloomFilterPredicate(table, bloom_filter, count);
    }

    // IS NOT NULL predicate
    ASSERT_EQ(CountRows(table, {}) - 1,
              CountRows(table, { table->NewIsNotNullPredicate("value") }));

    // IS NULL predicate
    ASSERT_EQ(1, CountRows(table, { table->NewIsNullPredicate("value") }));
  }

  // Check string predicates against the specified table.
  void CheckStringPredicates(const shared_ptr<KuduTable>& table, const vector<string>& values) {

    ASSERT_EQ(values.size() + 1, CountRows(table, {}));

    // Add some additional values to check against.
    vector<string> test_values = values;
    test_values.emplace_back("aa");
    test_values.emplace_back("\1", 1);
    test_values.emplace_back("a\1", 1);

    for (const string& v : test_values) {
      SCOPED_TRACE(Substitute("test value: '$0'", strings::CHexEscape(v)));

      { // value = v
        int count = count_if(values.begin(), values.end(),
                             [&] (const string& value) { return value == v; });
        ASSERT_EQ(count, CountRows(table, {
              table->NewComparisonPredicate("value",
                                            KuduPredicate::EQUAL,
                                            KuduValue::CopyString(v)),
        }));
      }

      { // value >= v
        int count = count_if(values.begin(), values.end(),
                             [&] (const string& value) { return value >= v; });
        ASSERT_EQ(count, CountRows(table, {
              table->NewComparisonPredicate("value",
                                            KuduPredicate::GREATER_EQUAL,
                                            KuduValue::CopyString(v)),
        }));
      }

      { // value <= v
        int count = count_if(values.begin(), values.end(),
                             [&] (const string& value) { return value <= v; });
        ASSERT_EQ(count, CountRows(table, {
              table->NewComparisonPredicate("value",
                                            KuduPredicate::LESS_EQUAL,
                                            KuduValue::CopyString(v)),
        }));
      }

      { // value > v
        int count = count_if(values.begin(), values.end(),
                             [&] (const string& value) { return value > v; });
        ASSERT_EQ(count, CountRows(table, {
              table->NewComparisonPredicate("value",
                                            KuduPredicate::GREATER,
                                            KuduValue::CopyString(v)),
        }));
      }

      { // value < v
        int count = count_if(values.begin(), values.end(),
                             [&] (const string& value) { return value < v; });
        ASSERT_EQ(count, CountRows(table, {
              table->NewComparisonPredicate("value",
                                            KuduPredicate::LESS,
                                            KuduValue::CopyString(v)),
        }));
      }

      { // value >= "a"
        // value <= v
        int count = count_if(values.begin(), values.end(),
                             [&] (const string& value) { return value >= "a" && value <= v; });
        ASSERT_EQ(count, CountRows(table, {
              table->NewComparisonPredicate("value",
                                            KuduPredicate::GREATER_EQUAL,
                                            KuduValue::CopyString("a")),
              table->NewComparisonPredicate("value",
                                            KuduPredicate::LESS_EQUAL,
                                            KuduValue::CopyString(v)),
        }));
      }

      { // value >= v
        // value <= "a"
        int count = count_if(values.begin(), values.end(),
                             [&] (const string& value) { return value >= v && value <= "a"; });
        ASSERT_EQ(count, CountRows(table, {
              table->NewComparisonPredicate("value",
                                            KuduPredicate::GREATER_EQUAL,
                                            KuduValue::CopyString(v)),
              table->NewComparisonPredicate("value",
                                            KuduPredicate::LESS_EQUAL,
                                            KuduValue::CopyString("a")),
        }));
      }
    }

    // IN list and IN Bloom filter predicates
    std::mt19937 gen(SeedRandom());
    std::shuffle(test_values.begin(), test_values.end(), gen);

    for (auto end = test_values.begin(); end <= test_values.end(); end++) {
      vector<KuduValue*> vals;
      auto* bloom_filter = CreateBloomFilter(std::distance(test_values.begin(), end));

      for (auto itr = test_values.begin(); itr != end; itr++) {
        vals.push_back(KuduValue::CopyString(*itr));
        bloom_filter->Insert(*itr);
      }

      int count = CountMatchedRows<string>(values, vector<string>(test_values.begin(), end));
      ASSERT_EQ(count, CountRows(table, { table->NewInListPredicate("value", &vals) }));
      CheckInBloomFilterPredicate(table, bloom_filter, count);
    }

    // IS NOT NULL predicate
    ASSERT_EQ(CountRows(table, {}) - 1,
              CountRows(table, { table->NewIsNotNullPredicate("value") }));

    // IS NULL predicate
    ASSERT_EQ(1, CountRows(table, { table->NewIsNullPredicate("value") }));
  }

  string GetTheOnlyTabletId() {
    // Cluster is setup with single tablet server and a single table.
    CHECK_EQ(1, cluster_->num_tablet_servers());
    const auto* mini_tserver = cluster_->mini_tablet_server(0);
    const auto& tablets = mini_tserver->ListTablets();
    CHECK_EQ(1, tablets.size());
    return tablets[0];
  }

  // Function to verify ineffective disabled predicates on specified tablet.
  void CheckDisabledPredicates(const string& tablet_id, int64_t expected_disabled_predicates) {
    // Cluster is setup with single tablet server and a single table.
    ASSERT_EQ(1, cluster_->num_tablet_servers());
    const auto* mini_tserver = cluster_->mini_tablet_server(0);
    int64_t predicates_disabled = 0;
    ASSERT_OK(itest::GetInt64Metric(HostPort(mini_tserver->bound_http_addr()),
                                    &METRIC_ENTITY_tablet,
                                    tablet_id.c_str(),
                                    &METRIC_scanner_predicates_disabled,
                                    "value",
                                    &predicates_disabled));
    ASSERT_EQ(expected_disabled_predicates, predicates_disabled);
  }

  shared_ptr<KuduClient> client_;
  unique_ptr<InternalMiniCluster> cluster_;
};

TEST_F(PredicateTest, TestBoolPredicates) {
  shared_ptr<KuduTable> table = CreateAndOpenTable(KuduColumnSchema::BOOL);
  shared_ptr<KuduSession> session = CreateSession();

  int i = 0;
  for (bool b : { false, true }) {
      unique_ptr<KuduInsert> insert(table->NewInsert());
      ASSERT_OK(insert->mutable_row()->SetInt64("key", i++));
      ASSERT_OK(insert->mutable_row()->SetBool("value", b));
      ASSERT_OK(session->Apply(insert.release()));
  }

  // Insert null value
  unique_ptr<KuduInsert> insert(table->NewInsert());
  ASSERT_OK(insert->mutable_row()->SetInt64("key", i++));
  ASSERT_OK(session->Apply(insert.release()));
  ASSERT_OK(session->Flush());

  ASSERT_EQ(3, CountRows(table, {}));

  { // value = false
    KuduPredicate* pred = table->NewComparisonPredicate("value",
                                                        KuduPredicate::EQUAL,
                                                        KuduValue::FromBool(false));
    ASSERT_EQ(1, CountRows(table, { pred }));
  }

  { // value = true
    KuduPredicate* pred = table->NewComparisonPredicate("value",
                                                        KuduPredicate::EQUAL,
                                                        KuduValue::FromBool(true));
    ASSERT_EQ(1, CountRows(table, { pred }));
  }

  { // value >= true
    KuduPredicate* pred = table->NewComparisonPredicate("value",
                                                        KuduPredicate::GREATER_EQUAL,
                                                        KuduValue::FromBool(true));
    ASSERT_EQ(1, CountRows(table, { pred }));
  }

  { // value >= false
    KuduPredicate* pred = table->NewComparisonPredicate("value",
                                                        KuduPredicate::GREATER_EQUAL,
                                                        KuduValue::FromBool(false));
    ASSERT_EQ(2, CountRows(table, { pred }));
  }

  { // value <= false
    KuduPredicate* pred = table->NewComparisonPredicate("value",
                                                        KuduPredicate::LESS_EQUAL,
                                                        KuduValue::FromBool(false));
    ASSERT_EQ(1, CountRows(table, { pred }));
  }

  { // value <= true
    KuduPredicate* pred = table->NewComparisonPredicate("value",
                                                        KuduPredicate::LESS_EQUAL,
                                                        KuduValue::FromBool(true));
    ASSERT_EQ(2, CountRows(table, { pred }));
  }

  { // value > true
    KuduPredicate* pred = table->NewComparisonPredicate("value",
                                                        KuduPredicate::GREATER,
                                                        KuduValue::FromBool(true));
    ASSERT_EQ(0, CountRows(table, { pred }));
  }

  { // value > false
    KuduPredicate* pred = table->NewComparisonPredicate("value",
                                                        KuduPredicate::GREATER,
                                                        KuduValue::FromBool(false));
    ASSERT_EQ(1, CountRows(table, { pred }));
  }

  { // value < false
    KuduPredicate* pred = table->NewComparisonPredicate("value",
                                                        KuduPredicate::LESS,
                                                        KuduValue::FromBool(false));
    ASSERT_EQ(0, CountRows(table, { pred }));
  }

  { // value < true
    KuduPredicate* pred = table->NewComparisonPredicate("value",
                                                        KuduPredicate::LESS,
                                                        KuduValue::FromBool(true));
    ASSERT_EQ(1, CountRows(table, { pred }));
  }

  { // value IN ()
    vector<KuduValue*> values = { };
    KuduPredicate* pred = table->NewInListPredicate("value", &values);
    ASSERT_EQ(0, CountRows(table, { pred }));
  }

  { // value IN (true)
    vector<KuduValue*> values = { KuduValue::FromBool(true) };
    KuduPredicate* pred = table->NewInListPredicate("value", &values);
    ASSERT_EQ(1, CountRows(table, { pred }));
  }

  { // value IN (false)
    vector<KuduValue*> values = { KuduValue::FromBool(false) };
    KuduPredicate* pred = table->NewInListPredicate("value", &values);
    ASSERT_EQ(1, CountRows(table, { pred }));
  }

  { // value IN (true, false)
    vector<KuduValue*> values = { KuduValue::FromBool(false), KuduValue::FromBool(true) };
    KuduPredicate* pred = table->NewInListPredicate("value", &values);
    ASSERT_EQ(2, CountRows(table, { pred }));
  }

  { // empty BloomFilter
    auto* bloom_filter = CreateBloomFilter(0);
    CheckInBloomFilterPredicate(table, bloom_filter, 0);
  }

  { // vector with no BloomFilters
    vector<KuduBloomFilter*> no_bloom_filters = {};
    auto* bf_predicate = table->NewInBloomFilterPredicate("value", &no_bloom_filters);
    ASSERT_TRUE(no_bloom_filters.empty());
    KuduScanner scanner(table.get());
    Status s = scanner.AddConjunctPredicate(bf_predicate);
    ASSERT_TRUE(s.IsInvalidArgument());
    ASSERT_STR_CONTAINS(s.ToString(), "No Bloom filters supplied");
  }

  { // BloomFilter with (true)
    auto* bloom_filter = CreateBloomFilter(1);
    bool key = true;
    bloom_filter->Insert(Slice(reinterpret_cast<const uint8_t*>(&key), sizeof(key)));
    CheckInBloomFilterPredicate(table, bloom_filter, 1);
  }

  { // BloomFilter with (false)
    auto* bloom_filter = CreateBloomFilter(1);
    bool key = false;
    bloom_filter->Insert(Slice(reinterpret_cast<const uint8_t*>(&key), sizeof(key)));
    CheckInBloomFilterPredicate(table, bloom_filter, 1);
  }

  { // BloomFilter with (true, false)
    auto* bloom_filter = CreateBloomFilter(2);
    bool key = true;
    bloom_filter->Insert(Slice(reinterpret_cast<const uint8_t*>(&key), sizeof(key)));
    key = false;
    bloom_filter->Insert(Slice(reinterpret_cast<const uint8_t*>(&key), sizeof(key)));
    CheckInBloomFilterPredicate(table, bloom_filter, 2);
  }

  // IS NOT NULL predicate
  ASSERT_EQ(CountRows(table, {}) - 1,
            CountRows(table, { table->NewIsNotNullPredicate("value") }));

  // IS NULL predicate
  ASSERT_EQ(1, CountRows(table, { table->NewIsNullPredicate("value") }));
}

TEST_F(PredicateTest, TestInt8Predicates) {
  shared_ptr<KuduTable> table = CreateAndOpenTable(KuduColumnSchema::INT8);
  shared_ptr<KuduSession> session = CreateSession();

  int i = 0;
  for (int8_t value : CreateIntValues<int8_t>()) {
      unique_ptr<KuduInsert> insert(table->NewInsert());
      ASSERT_OK(insert->mutable_row()->SetInt64("key", i++));
      ASSERT_OK(insert->mutable_row()->SetInt8("value", value));
      ASSERT_OK(session->Apply(insert.release()));
  }
  unique_ptr<KuduInsert> null_insert(table->NewInsert());
  ASSERT_OK(null_insert->mutable_row()->SetInt64("key", i++));
  ASSERT_OK(null_insert->mutable_row()->SetNull("value"));
  ASSERT_OK(session->Apply(null_insert.release()));
  ASSERT_OK(session->Flush());

  CheckIntPredicates<int8_t>(table);
}

TEST_F(PredicateTest, TestInt16Predicates) {
  shared_ptr<KuduTable> table = CreateAndOpenTable(KuduColumnSchema::INT16);
  shared_ptr<KuduSession> session = CreateSession();

  int i = 0;
  for (int16_t value : CreateIntValues<int16_t>()) {
      unique_ptr<KuduInsert> insert(table->NewInsert());
      ASSERT_OK(insert->mutable_row()->SetInt64("key", i++));
      ASSERT_OK(insert->mutable_row()->SetInt16("value", value));
      ASSERT_OK(session->Apply(insert.release()));
  }
  unique_ptr<KuduInsert> null_insert(table->NewInsert());
  ASSERT_OK(null_insert->mutable_row()->SetInt64("key", i++));
  ASSERT_OK(null_insert->mutable_row()->SetNull("value"));
  ASSERT_OK(session->Apply(null_insert.release()));
  ASSERT_OK(session->Flush());

  CheckIntPredicates<int16_t>(table);
}

TEST_F(PredicateTest, TestInt32Predicates) {
  shared_ptr<KuduTable> table = CreateAndOpenTable(KuduColumnSchema::INT32);
  shared_ptr<KuduSession> session = CreateSession();

  int i = 0;
  for (int32_t value : CreateIntValues<int32_t>()) {
      unique_ptr<KuduInsert> insert(table->NewInsert());
      ASSERT_OK(insert->mutable_row()->SetInt64("key", i++));
      ASSERT_OK(insert->mutable_row()->SetInt32("value", value));
      ASSERT_OK(session->Apply(insert.release()));
  }
  unique_ptr<KuduInsert> null_insert(table->NewInsert());
  ASSERT_OK(null_insert->mutable_row()->SetInt64("key", i++));
  ASSERT_OK(null_insert->mutable_row()->SetNull("value"));
  ASSERT_OK(session->Apply(null_insert.release()));
  ASSERT_OK(session->Flush());

  CheckIntPredicates<int32_t>(table);
}

TEST_F(PredicateTest, TestInt64Predicates) {
  shared_ptr<KuduTable> table = CreateAndOpenTable(KuduColumnSchema::INT64);
  shared_ptr<KuduSession> session = CreateSession();

  int i = 0;
  for (int64_t value : CreateIntValues<int64_t>()) {
      unique_ptr<KuduInsert> insert(table->NewInsert());
      ASSERT_OK(insert->mutable_row()->SetInt64("key", i++));
      ASSERT_OK(insert->mutable_row()->SetInt64("value", value));
      ASSERT_OK(session->Apply(insert.release()));
  }
  unique_ptr<KuduInsert> null_insert(table->NewInsert());
  ASSERT_OK(null_insert->mutable_row()->SetInt64("key", i++));
  ASSERT_OK(null_insert->mutable_row()->SetNull("value"));
  ASSERT_OK(session->Apply(null_insert.release()));
  ASSERT_OK(session->Flush());

  CheckIntPredicates<int64_t>(table);
}

TEST_F(PredicateTest, TestTimestampPredicates) {
  shared_ptr<KuduTable> table = CreateAndOpenTable(KuduColumnSchema::UNIXTIME_MICROS);
  shared_ptr<KuduSession> session = CreateSession();

  int i = 0;
  for (int64_t value : CreateIntValues<int64_t>()) {
      unique_ptr<KuduInsert> insert(table->NewInsert());
      ASSERT_OK(insert->mutable_row()->SetInt64("key", i++));
      ASSERT_OK(insert->mutable_row()->SetUnixTimeMicros("value", value));
      ASSERT_OK(session->Apply(insert.release()));
  }
  unique_ptr<KuduInsert> null_insert(table->NewInsert());
  ASSERT_OK(null_insert->mutable_row()->SetInt64("key", i++));
  ASSERT_OK(null_insert->mutable_row()->SetNull("value"));
  ASSERT_OK(session->Apply(null_insert.release()));
  ASSERT_OK(session->Flush());

  CheckIntPredicates<int64_t>(table);
}

TEST_F(PredicateTest, TestDatePredicates) {
  shared_ptr<KuduTable> table = CreateAndOpenTable(KuduColumnSchema::DATE);
  shared_ptr<KuduSession> session = CreateSession();

  int i = 0;
  for (int32_t value : CreateIntValues<int32_t>()) {
      unique_ptr<KuduInsert> insert(table->NewInsert());
      ASSERT_OK(insert->mutable_row()->SetInt64("key", i++));
      ASSERT_OK(insert->mutable_row()->SetDate("value", value));
      ASSERT_OK(session->Apply(insert.release()));
  }
  unique_ptr<KuduInsert> null_insert(table->NewInsert());
  ASSERT_OK(null_insert->mutable_row()->SetInt64("key", i++));
  ASSERT_OK(null_insert->mutable_row()->SetNull("value"));
  ASSERT_OK(session->Apply(null_insert.release()));
  ASSERT_OK(session->Flush());

  CheckIntPredicates<int32_t>(table);
}

TEST_F(PredicateTest, TestFloatPredicates) {
  shared_ptr<KuduTable> table = CreateAndOpenTable(KuduColumnSchema::FLOAT);
  shared_ptr<KuduSession> session = CreateSession();

  vector<float> values = CreateFloatingPointValues<float>();
  vector<float> test_values = CreateFloatingPointTestValues<float>();

  int i = 0;
  for (float value : values) {
      unique_ptr<KuduInsert> insert(table->NewInsert());
      ASSERT_OK(insert->mutable_row()->SetInt64("key", i++));
      ASSERT_OK(insert->mutable_row()->SetFloat("value", value));
      ASSERT_OK(session->Apply(insert.release()));
  }
  unique_ptr<KuduInsert> null_insert(table->NewInsert());
  ASSERT_OK(null_insert->mutable_row()->SetInt64("key", i++));
  ASSERT_OK(null_insert->mutable_row()->SetNull("value"));
  ASSERT_OK(session->Apply(null_insert.release()));
  ASSERT_OK(session->Flush());

  ASSERT_EQ(values.size() + 1, CountRows(table, {}));

  for (float v : test_values) {
    SCOPED_TRACE(Substitute("test value: $0", v));

    { // value = v
      int count = count_if(values.begin(), values.end(),
                           [&] (float value) { return value == v; });
      ASSERT_EQ(count, CountRows(table, {
            table->NewComparisonPredicate("value", KuduPredicate::EQUAL, KuduValue::FromFloat(v)),
      }));
    }

    { // value >= v
      int count = count_if(values.begin(), values.end(),
                           [&] (float value) { return value >= v; });
      ASSERT_EQ(count, CountRows(table, {
            table->NewComparisonPredicate("value",
                                          KuduPredicate::GREATER_EQUAL,
                                          KuduValue::FromFloat(v)),
      }));
    }

    { // value <= v
      int count = count_if(values.begin(), values.end(),
                           [&] (float value) { return value <= v; });
      ASSERT_EQ(count, CountRows(table, {
            table->NewComparisonPredicate("value",
                                          KuduPredicate::LESS_EQUAL,
                                          KuduValue::FromFloat(v)),
      }));
    }

    { // value > v
      int count = count_if(values.begin(), values.end(),
                           [&] (float value) { return value > v; });
      ASSERT_EQ(count, CountRows(table, {
            table->NewComparisonPredicate("value",
                                          KuduPredicate::GREATER,
                                          KuduValue::FromFloat(v)),
      }));
    }

    { // value < v
      int count = count_if(values.begin(), values.end(),
                           [&] (float value) { return value < v; });
      ASSERT_EQ(count, CountRows(table, {
            table->NewComparisonPredicate("value",
                                          KuduPredicate::LESS,
                                          KuduValue::FromFloat(v)),
      }));
    }

    { // value >= 0
      // value <= v
      int count = count_if(values.begin(), values.end(),
                           [&] (float value) { return value >= 0.0 && value <= v; });
      ASSERT_EQ(count, CountRows(table, {
            table->NewComparisonPredicate("value",
                                          KuduPredicate::GREATER_EQUAL,
                                          KuduValue::FromFloat(0.0)),
            table->NewComparisonPredicate("value",
                                          KuduPredicate::LESS_EQUAL,
                                          KuduValue::FromFloat(v)),
      }));
    }

    { // value >= v
      // value <= 0.0
      int count = count_if(values.begin(), values.end(),
                           [&] (float value) { return value >= v && value <= 0.0; });
      ASSERT_EQ(count, CountRows(table, {
            table->NewComparisonPredicate("value",
                                          KuduPredicate::GREATER_EQUAL,
                                          KuduValue::FromFloat(v)),
            table->NewComparisonPredicate("value",
                                          KuduPredicate::LESS_EQUAL,
                                          KuduValue::FromFloat(0.0)),
      }));
    }
  }

  // IN list and IN Bloom filter predicates
  std::mt19937 gen(SeedRandom());
  std::shuffle(test_values.begin(), test_values.end(), gen);

  for (auto end = test_values.begin(); end <= test_values.end(); end++) {
    vector<KuduValue*> vals;
    auto* bloom_filter = CreateBloomFilter(std::distance(test_values.begin(), end));

    for (auto itr = test_values.begin(); itr != end; itr++) {
      vals.push_back(KuduValue::FromFloat(*itr));
      auto key = *itr;
      bloom_filter->Insert(Slice(reinterpret_cast<const uint8*>(&key), sizeof(key)));
    }

    int count = CountMatchedRows(values, vector<float>(test_values.begin(), end));
    ASSERT_EQ(count, CountRows(table, { table->NewInListPredicate("value", &vals) }));
    int count_with_sign_bit = CountMatchedFloatingPointRowsWithSignBit(
        values, vector<float>(test_values.begin(), end));
    CheckInBloomFilterPredicate(table, bloom_filter, count_with_sign_bit);
  }

  // IS NOT NULL predicate
  ASSERT_EQ(values.size(),
            CountRows(table, { table->NewIsNotNullPredicate("value") }));

  // IS NULL predicate
  ASSERT_EQ(1, CountRows(table, { table->NewIsNullPredicate("value") }));
}

TEST_F(PredicateTest, TestDoublePredicates) {
  shared_ptr<KuduTable> table = CreateAndOpenTable(KuduColumnSchema::DOUBLE);
  shared_ptr<KuduSession> session = CreateSession();

  vector<double> values = CreateFloatingPointValues<double>();
  vector<double> test_values = CreateFloatingPointTestValues<double>();

  int i = 0;
  for (double value : values) {
      unique_ptr<KuduInsert> insert(table->NewInsert());
      ASSERT_OK(insert->mutable_row()->SetInt64("key", i++));
      ASSERT_OK(insert->mutable_row()->SetDouble("value", value));
      ASSERT_OK(session->Apply(insert.release()));
  }
  unique_ptr<KuduInsert> null_insert(table->NewInsert());
  ASSERT_OK(null_insert->mutable_row()->SetInt64("key", i++));
  ASSERT_OK(null_insert->mutable_row()->SetNull("value"));
  ASSERT_OK(session->Apply(null_insert.release()));
  ASSERT_OK(session->Flush());

  ASSERT_EQ(values.size() + 1, CountRows(table, {}));

  for (double v : test_values) {
    SCOPED_TRACE(Substitute("test value: $0", v));

    { // value = v
      int count = count_if(values.begin(), values.end(),
                           [&] (double value) { return value == v; });
      ASSERT_EQ(count, CountRows(table, {
            table->NewComparisonPredicate("value", KuduPredicate::EQUAL, KuduValue::FromDouble(v)),
      }));
    }

    { // value >= v
      int count = count_if(values.begin(), values.end(),
                           [&] (double value) { return value >= v; });
      ASSERT_EQ(count, CountRows(table, {
            table->NewComparisonPredicate("value",
                                          KuduPredicate::GREATER_EQUAL,
                                          KuduValue::FromDouble(v)),
      }));
    }

    { // value <= v
      int count = count_if(values.begin(), values.end(),
                           [&] (double value) { return value <= v; });
      ASSERT_EQ(count, CountRows(table, {
            table->NewComparisonPredicate("value",
                                          KuduPredicate::LESS_EQUAL,
                                          KuduValue::FromDouble(v)),
      }));
    }

    { // value > v
      int count = count_if(values.begin(), values.end(),
                           [&] (double value) { return value > v; });
      ASSERT_EQ(count, CountRows(table, {
            table->NewComparisonPredicate("value",
                                          KuduPredicate::GREATER,
                                          KuduValue::FromDouble(v)),
      }));
    }

    { // value < v
      int count = count_if(values.begin(), values.end(),
                           [&] (double value) { return value < v; });
      ASSERT_EQ(count, CountRows(table, {
            table->NewComparisonPredicate("value",
                                          KuduPredicate::LESS,
                                          KuduValue::FromDouble(v)),
      }));
    }

    { // value >= 0.0
      // value <= v
      int count = count_if(values.begin(), values.end(),
                           [&] (double value) { return value >= 0.0 && value <= v; });
      ASSERT_EQ(count, CountRows(table, {
            table->NewComparisonPredicate("value",
                                          KuduPredicate::GREATER_EQUAL,
                                          KuduValue::FromDouble(0.0)),
            table->NewComparisonPredicate("value",
                                          KuduPredicate::LESS_EQUAL,
                                          KuduValue::FromDouble(v)),
      }));
    }

    { // value >= v
      // value <= 0.0
      int count = count_if(values.begin(), values.end(),
                           [&] (double value) { return value >= v && value <= 0.0; });
      ASSERT_EQ(count, CountRows(table, {
            table->NewComparisonPredicate("value",
                                          KuduPredicate::GREATER_EQUAL,
                                          KuduValue::FromDouble(v)),
            table->NewComparisonPredicate("value",
                                          KuduPredicate::LESS_EQUAL,
                                          KuduValue::FromDouble(0.0)),
      }));
    }
  }

  // IN list and IN Bloom filter predicates
  std::mt19937 gen(SeedRandom());
  std::shuffle(test_values.begin(), test_values.end(), gen);

  for (auto end = test_values.begin(); end <= test_values.end(); end++) {
    vector<KuduValue*> vals;
    auto* bloom_filter = CreateBloomFilter(std::distance(test_values.begin(), end));

    for (auto itr = test_values.begin(); itr != end; itr++) {
      vals.push_back(KuduValue::FromDouble(*itr));
      auto key = *itr;
      bloom_filter->Insert(Slice(reinterpret_cast<const uint8_t*>(&key), sizeof(key)));
    }

    int count = CountMatchedRows(values, vector<double>(test_values.begin(), end));
    ASSERT_EQ(count, CountRows(table, { table->NewInListPredicate("value", &vals) }));
    int count_with_sign_bit = CountMatchedFloatingPointRowsWithSignBit(
        values, vector<double>(test_values.begin(), end));
    CheckInBloomFilterPredicate(table, bloom_filter, count_with_sign_bit);
  }

  // IS NOT NULL predicate
  ASSERT_EQ(values.size(),
            CountRows(table, { table->NewIsNotNullPredicate("value") }));

  // IS NULL predicate
  ASSERT_EQ(1, CountRows(table, { table->NewIsNullPredicate("value") }));
}

TEST_F(PredicateTest, TestDecimalPredicates) {
  KuduSchema schema;
  {
    KuduSchemaBuilder builder;
    builder.AddColumn("key")->NotNull()->Type(KuduColumnSchema::INT64)->PrimaryKey();
    builder.AddColumn("value")->Type(KuduColumnSchema::DECIMAL)
        ->Precision(kMaxDecimal128Precision)->Scale(2);
    CHECK_OK(builder.Build(&schema));
  }
  unique_ptr<client::KuduTableCreator> table_creator(client_->NewTableCreator());
  CHECK_OK(table_creator->table_name("table")
               .schema(&schema)
               .set_range_partition_columns({ "key" })
               .num_replicas(1)
               .Create());

  shared_ptr<KuduTable> table;
  CHECK_OK(client_->OpenTable("table", &table));

  shared_ptr<KuduSession> session = CreateSession();

  vector<int128_t> values = CreateDecimalValues();
  vector<int128_t> test_values = CreateDecimalTestValues();

  int i = 0;
  for (int128_t value : values) {
    unique_ptr<KuduInsert> insert(table->NewInsert());
    ASSERT_OK(insert->mutable_row()->SetInt64("key", i++));
    ASSERT_OK(insert->mutable_row()->SetUnscaledDecimal("value", value));
    ASSERT_OK(session->Apply(insert.release()));
  }
  unique_ptr<KuduInsert> null_insert(table->NewInsert());
  ASSERT_OK(null_insert->mutable_row()->SetInt64("key", i++));
  ASSERT_OK(null_insert->mutable_row()->SetNull("value"));
  ASSERT_OK(session->Apply(null_insert.release()));
  ASSERT_OK(session->Flush());

  ASSERT_EQ(values.size() + 1, CountRows(table, {}));

  for (int128_t v : test_values) {
    SCOPED_TRACE(Substitute("test value: $0", v));

    { // value = v
      int count = count_if(values.begin(), values.end(),
                           [&] (int128_t value) { return value == v; });
      ASSERT_EQ(count, CountRows(table, {
          table->NewComparisonPredicate("value", KuduPredicate::EQUAL,
                                        KuduValue::FromDecimal(v, 2)),
      }));
    }

    { // value >= v
      int count = count_if(values.begin(), values.end(),
                           [&] (int128_t value) { return value >= v; });
      ASSERT_EQ(count, CountRows(table, {
          table->NewComparisonPredicate("value",
                                        KuduPredicate::GREATER_EQUAL,
                                        KuduValue::FromDecimal(v, 2)),
      }));
    }

    { // value <= v
      int count = count_if(values.begin(), values.end(),
                           [&] (int128_t value) { return value <= v; });
      ASSERT_EQ(count, CountRows(table, {
          table->NewComparisonPredicate("value",
                                        KuduPredicate::LESS_EQUAL,
                                        KuduValue::FromDecimal(v, 2)),
      }));
    }

    { // value > v
      int count = count_if(values.begin(), values.end(),
                           [&] (int128_t value) { return value > v; });
      ASSERT_EQ(count, CountRows(table, {
          table->NewComparisonPredicate("value",
                                        KuduPredicate::GREATER,
                                        KuduValue::FromDecimal(v, 2)),
      }));
    }

    { // value < v
      int count = count_if(values.begin(), values.end(),
                           [&] (int128_t value) { return value < v; });
      ASSERT_EQ(count, CountRows(table, {
          table->NewComparisonPredicate("value",
                                        KuduPredicate::LESS,
                                        KuduValue::FromDecimal(v, 2)),
      }));
    }

    { // value >= 0
      // value <= v
      int count = count_if(values.begin(), values.end(),
                           [&] (int128_t value) { return value >= 0 && value <= v; });
      ASSERT_EQ(count, CountRows(table, {
          table->NewComparisonPredicate("value",
                                        KuduPredicate::GREATER_EQUAL,
                                        KuduValue::FromDecimal(0, 2)),
          table->NewComparisonPredicate("value",
                                        KuduPredicate::LESS_EQUAL,
                                        KuduValue::FromDecimal(v, 2)),
      }));
    }

    { // value >= v
      // value <= 0.0
      int count = count_if(values.begin(), values.end(),
                           [&] (int128_t value) { return value >= v && value <= 0; });
      ASSERT_EQ(count, CountRows(table, {
          table->NewComparisonPredicate("value",
                                        KuduPredicate::GREATER_EQUAL,
                                        KuduValue::FromDecimal(v, 2)),
          table->NewComparisonPredicate("value",
                                        KuduPredicate::LESS_EQUAL,
                                        KuduValue::FromDecimal(0, 2)),
      }));
    }
  }

  // IN list and IN Bloom filter predicates
  std::mt19937 gen(SeedRandom());
  std::shuffle(test_values.begin(), test_values.end(), gen);

  for (auto end = test_values.begin(); end <= test_values.end(); end++) {
    vector<KuduValue*> vals;
    auto* bloom_filter = CreateBloomFilter(std::distance(test_values.begin(), end));

    for (auto itr = test_values.begin(); itr != end; itr++) {
      vals.push_back(KuduValue::FromDecimal(*itr, 2));
      auto key = *itr;
      bloom_filter->Insert(Slice(reinterpret_cast<const uint8_t*>(&key), sizeof(key)));
    }

    int count = CountMatchedRows<int128_t>(values, vector<int128_t>(test_values.begin(), end));
    ASSERT_EQ(count, CountRows(table, { table->NewInListPredicate("value", &vals) }));
    CheckInBloomFilterPredicate(table, bloom_filter, count);
  }

  // IS NOT NULL predicate
  ASSERT_EQ(values.size(),
            CountRows(table, { table->NewIsNotNullPredicate("value") }));

  // IS NULL predicate
  ASSERT_EQ(1, CountRows(table, { table->NewIsNullPredicate("value") }));
}

class BloomFilterPredicateTest : public PredicateTest {
 public:
  BloomFilterPredicateTest() : rand_(Random(SeedRandom())) {}
 protected:
  shared_ptr<KuduTable> table_;
  shared_ptr<KuduSession> session_;
  Random rand_;
  unordered_set<int32_t> all_values_;
  int32_t min_value_, max_value_;
  // Subset of "all_values_".
  vector<int32_t> included_values_;
  // Values that are not contained in "all_values_".
  unordered_set<int32_t> excluded_values_;
  // Number of false positives based on the number of values that'll be searched against.
  int num_false_positive_values_;

  // Initialize the members with the values that will be inserted to the table and Bloom filters.
  // The table will have 'num_all_values' unique values, and we'll generate two Bloom filters:
  // one with 'num_included_values' values from the table, and one with 'num_excluded_values'
  // values that aren't present in the table.
  void Init(int num_all_values, int num_included_values, int num_excluded_values) {
    ASSERT_LE(num_included_values, num_all_values);

    table_ = CreateAndOpenTable(KuduColumnSchema::INT32);
    session_ = CreateSession();

    const unordered_set<int32_t> empty_set;
    all_values_ = CreateRandomUniqueIntegers<int32_t>(num_all_values, empty_set, &rand_);
    auto min_max_pair = std::minmax_element(all_values_.begin(), all_values_.end());
    min_value_ = *min_max_pair.first;
    max_value_ = *min_max_pair.second;
    ReservoirSample(all_values_, num_included_values, empty_set, &rand_, &included_values_);
    excluded_values_ = CreateRandomUniqueIntegers<int32_t>(num_excluded_values, all_values_,
                                                           &rand_);
    // NOLINTNEXTLINE(bugprone-narrowing-conversions)
    num_false_positive_values_ = num_all_values * kBloomFilterFalsePositiveProb;
  }

  void InsertAllValuesInTable() {
    int i = 0;
    for (auto value : all_values_) {
      unique_ptr<KuduInsert> insert(table_->NewInsert());
      ASSERT_OK(insert->mutable_row()->SetInt64("key", i++));
      ASSERT_OK(insert->mutable_row()->SetInt32("value", value));
      ASSERT_OK(session_->Apply(insert.release()));
    }
    ASSERT_OK(session_->Flush());
  }

  // Combine supplied Bloom filter predicates that contains included values
  // with Range predicate.
  void TestWithRangePredicate(KuduPredicate* included_predicate1,
                              KuduPredicate* included_predicate2) {
    auto* less_predicate = table_->NewComparisonPredicate("value", KuduPredicate::LESS,
                                                          KuduValue::FromInt(min_value_));
    int actual_count_less;
    LOG_TIMING(INFO, Substitute("$0: Counting rows with a range predicate less than the min value",
                                CURRENT_TEST_NAME())) {
      actual_count_less = CountRows(table_, {included_predicate1, less_predicate});
    }
    EXPECT_EQ(0, actual_count_less);

    auto* ge_predicate = table_->NewComparisonPredicate("value", KuduPredicate::GREATER_EQUAL,
                                                        KuduValue::FromInt(min_value_));
    auto* le_predicate = table_->NewComparisonPredicate("value", KuduPredicate::LESS_EQUAL,
                                                        KuduValue::FromInt(max_value_));
    int actual_count_range;
    LOG_TIMING(INFO, Substitute("$0: Counting rows with a range predicate that includes all values",
                                CURRENT_TEST_NAME())) {
      actual_count_range = CountRows(table_,
                                     {included_predicate2, ge_predicate, le_predicate});
    }
    EXPECT_LE(included_values_.size(), actual_count_range);
    EXPECT_GE(included_values_.size() + num_false_positive_values_, actual_count_range);
  }
};

TEST_F(BloomFilterPredicateTest, TestKuduBloomFilterPredicate) {
  Init(100000/*num_all_values*/, 10000/*num_included_values*/, 10000/*num_excluded_values*/);
  KuduBloomFilter* included_bf = CreateBloomFilterWithValues(included_values_);
  KuduBloomFilter* excluded_bf = CreateBloomFilterWithValues(excluded_values_);

  InsertAllValuesInTable();

  vector<KuduBloomFilter*> included_bf_vec = { included_bf };
  auto* included_predicate =
      table_->NewInBloomFilterPredicate("value", &included_bf_vec);
  auto* included_predicate_clone1 = included_predicate->Clone();
  auto* included_predicate_clone2 = included_predicate->Clone();

  ASSERT_TRUE(included_bf_vec.empty());
  int actual_count_included = CountRows(table_, { included_predicate });
  EXPECT_LE(included_values_.size(), actual_count_included);
  EXPECT_GE(included_values_.size() + num_false_positive_values_, actual_count_included);

  vector<KuduBloomFilter*> excluded_bf_vec = { excluded_bf };
  auto* excluded_predicate =
      table_->NewInBloomFilterPredicate("value", &excluded_bf_vec);
  ASSERT_TRUE(excluded_bf_vec.empty());
  int actual_count_excluded = CountRows(table_, { excluded_predicate });
  EXPECT_LE(0, actual_count_excluded);
  EXPECT_GE(num_false_positive_values_, actual_count_excluded);

  // Combine Range predicate with Bloom filter predicate.
  TestWithRangePredicate(included_predicate_clone1, included_predicate_clone2);
}

// Same as TestKuduBloomFilterPredicate above but using the overloaded NewInBloomFilterPredicate()
// client API with direct BlockBloomFilter pointer.
TEST_F(BloomFilterPredicateTest, TestDirectBlockBloomFilterPredicate) {
  Init(100000/*num_all_values*/, 10000/*num_included_values*/, 10000/*num_excluded_values*/);

  unique_ptr<BlockBloomFilter> included_bf = CreateDirectBloomFilterWithValues(included_values_);
  unique_ptr<BlockBloomFilter> excluded_bf = CreateDirectBloomFilterWithValues(excluded_values_);

  InsertAllValuesInTable();

  vector<Slice> included_bf_vec =
      { Slice(reinterpret_cast<const uint8_t*>(included_bf.get()), sizeof(*included_bf)) };
  const size_t included_bf_vec_size = included_bf_vec.size();

  auto* included_predicate =
      table_->NewInBloomFilterPredicate("value", included_bf_vec);
  auto* included_predicate_clone1 = included_predicate->Clone();
  auto* included_predicate_clone2 = included_predicate->Clone();

  ASSERT_EQ(included_bf_vec_size, included_bf_vec.size());
  int actual_count_included = CountRows(table_, { included_predicate });
  EXPECT_LE(included_values_.size(), actual_count_included);
  EXPECT_GE(included_values_.size() + num_false_positive_values_, actual_count_included);

  vector<Slice> excluded_bf_vec =
      { Slice(reinterpret_cast<const uint8_t*>(excluded_bf.get()), sizeof(*excluded_bf)) };
  const size_t excluded_bf_vec_size = excluded_bf_vec.size();
  auto* excluded_predicate =
      table_->NewInBloomFilterPredicate("value", excluded_bf_vec);
  ASSERT_EQ(excluded_bf_vec_size, excluded_bf_vec.size());

  int actual_count_excluded = CountRows(table_, { excluded_predicate });
  EXPECT_LE(0, actual_count_excluded);
  EXPECT_GE(num_false_positive_values_, actual_count_excluded);

  // Combine Range predicate with Bloom filter predicate.
  TestWithRangePredicate(included_predicate_clone1, included_predicate_clone2);
}

// Test to verify that an ineffective Bloom filter predicate will be disabled.
TEST_F(BloomFilterPredicateTest, TestDisabledBloomFilterPredicate) {
  // Insert all values from the table in Bloom filter so that the predicate
  // is determined to be ineffective.
  Init(10000/*num_all_values*/, 10000/*num_included_values*/, 0/*num_excluded_values*/);
  KuduBloomFilter* included_bf = CreateBloomFilterWithValues(included_values_);

  InsertAllValuesInTable();

  vector<KuduBloomFilter*> included_bf_vec = { included_bf };
  auto* included_predicate =
      table_->NewInBloomFilterPredicate("value", &included_bf_vec);

  ASSERT_TRUE(included_bf_vec.empty());
  FLAGS_predicate_effectivess_num_skip_blocks = 4;
  int actual_count_included = CountRows(table_, { included_predicate });
  ASSERT_EQ(included_values_.size(), actual_count_included);

  // CountRows() runs 2 scans using a cloned predicate.
  CheckDisabledPredicates(GetTheOnlyTabletId(), 2 /* expected_disabled_predicates */);
}

// Benchmark test that combines Bloom filter predicate with range predicate.
TEST_F(BloomFilterPredicateTest, TestKuduBloomFilterPredicateBenchmark) {
  SKIP_IF_SLOW_NOT_ALLOWED();

  // Writing large number of rows sometimes leads to write timeouts. Hence the test below
  // uses 1M rows instead.
  Init(1000000/*num_all_values*/, 10000/*num_included_values*/, 10000/*num_excluded_values*/);
  KuduBloomFilter* included_bf = CreateBloomFilterWithValues(included_values_);

  InsertAllValuesInTable();
  vector<KuduBloomFilter*> included_bf_vec = { included_bf };
  auto* included_predicate = table_->NewInBloomFilterPredicate("value", &included_bf_vec);
  auto* included_predicate_clone = included_predicate->Clone();

  TestWithRangePredicate(included_predicate, included_predicate_clone);
}

class ParameterizedBloomFilterPredicateTest :
    public PredicateTest,
    public ::testing::WithParamInterface<std::tuple<bool, bool>> {};

INSTANTIATE_TEST_SUITE_P(, ParameterizedBloomFilterPredicateTest,
                         ::testing::Combine(::testing::Bool(),
                                            ::testing::Bool()));

// Test to verify that an ineffective Bloom filter predicate will be disabled
// using a pattern of repeated strings.
TEST_P(ParameterizedBloomFilterPredicateTest, TestDisabledBloomFilterWithRepeatedStrings) {
  // Combination of following 2 flags help test cases with MRSs flushed, DRS and delta files.
  const bool flush_tablet = std::get<0>(GetParam());
  const bool update_rows = std::get<1>(GetParam());

  shared_ptr<KuduTable> table = CreateAndOpenTable(KuduColumnSchema::STRING);
  shared_ptr<KuduSession> session = CreateSession();

  // Use a set of predetermined strings and populate the table.
  // Create a BF predicate with same set of strings.
  deque<string> values = {"Alice", "Bob", "Charlie", "Doug", "Elizabeth", "Frank", "George",
                          "Harry", ""};

  // Populate table with a small set of strings that are repeated.
  static constexpr int kNumRows = 10000;
  auto upsert_rows = [&]() {
    int i = 0;
    int last_flush = i;
    while (i < kNumRows) {
      for (int j = 0; j < values.size() && i < kNumRows; j++, i++) {
        const string &value = values[j];
        unique_ptr<KuduUpsert> upsert(table->NewUpsert());
        ASSERT_OK(upsert->mutable_row()->SetInt64("key", i));
        ASSERT_OK(upsert->mutable_row()->SetStringNoCopy("value", value));
        ASSERT_OK(session->Apply(upsert.release()));
      }
      // TSAN builds timeout and fail on flushing with large number of rows.
      if (i - last_flush >= 1000) {
        ASSERT_OK(session->Flush());
        last_flush = i;
      }
    }
    ASSERT_OK(session->Flush());
  };

  upsert_rows();
  const string tablet_id = GetTheOnlyTabletId();

  if (flush_tablet) {
    ASSERT_OK(cluster_->FlushTablet(tablet_id));
  }

  if (update_rows) {
    string first_name = values.front();
    values.pop_front();
    values.push_back(first_name);

    upsert_rows();
    if (flush_tablet) {
      ASSERT_OK(cluster_->FlushTablet(tablet_id));
    }
  }

  // Create Bloom filter predicate with string values.
  auto* bf = CreateBloomFilter(values.size());
  for (const auto& value : values) {
    bf->Insert(Slice(value));
  }
  vector<KuduBloomFilter*> bf_vec = { bf };
  auto* bf_predicate = table->NewInBloomFilterPredicate("value", &bf_vec);
  ASSERT_TRUE(bf_vec.empty());

  FLAGS_predicate_effectivess_num_skip_blocks = 4;
  int actual_count_included = DoCountRows(table, { bf_predicate });
  ASSERT_EQ(kNumRows, actual_count_included);

  CheckDisabledPredicates(tablet_id, 1 /* expected_disabled_predicates */);
}

class ParameterizedPredicateTest : public PredicateTest,
  public ::testing::WithParamInterface<KuduColumnSchema::DataType> {};

INSTANTIATE_TEST_SUITE_P(, ParameterizedPredicateTest,
                         ::testing::Values(KuduColumnSchema::STRING,
                                           KuduColumnSchema::BINARY,
                                           KuduColumnSchema::VARCHAR));

TEST_P(ParameterizedPredicateTest, TestIndirectDataPredicates) {
  shared_ptr<KuduTable> table = CreateAndOpenTable(GetParam());
  shared_ptr<KuduSession> session = CreateSession();

  vector<string> values = CreateStringValues();
  int i = 0;
  for (const string& value : values) {
      unique_ptr<KuduInsert> insert(table->NewInsert());
      ASSERT_OK(insert->mutable_row()->SetInt64("key", i++));
      switch (GetParam()) {
        case KuduColumnSchema::STRING:
          ASSERT_OK(insert->mutable_row()->SetStringNoCopy("value", value));
          break;
        case KuduColumnSchema::BINARY:
          ASSERT_OK(insert->mutable_row()->SetBinaryNoCopy("value", value));
          break;
        case KuduColumnSchema::VARCHAR:
          ASSERT_OK(insert->mutable_row()->SetVarchar("value", value));
          break;
        default:
          LOG(FATAL) << "Invalid type";
          break;
      }
      ASSERT_OK(session->Apply(insert.release()));
  }
  unique_ptr<KuduInsert> null_insert(table->NewInsert());
  ASSERT_OK(null_insert->mutable_row()->SetInt64("key", i++));
  ASSERT_OK(null_insert->mutable_row()->SetNull("value"));
  ASSERT_OK(session->Apply(null_insert.release()));
  ASSERT_OK(session->Flush());

  CheckStringPredicates(table, values);
}

} // namespace client
} // namespace kudu
