blob: e8e7e2e8b64a31c54b23ab232c77d457d8d67367 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "util/bloom-filter.h"
#include <algorithm>
#include <unordered_set>
#include <vector>
#include "runtime/bufferpool/buffer-pool.h"
#include "runtime/bufferpool/reservation-tracker.h"
#include "runtime/mem-tracker.h"
#include "runtime/test-env.h"
#include "service/fe-support.h"
#include "testutil/gtest-util.h"
using namespace std;
namespace {
using namespace impala;
// Make a random uint64_t, avoiding the absent high bit and the low-entropy low bits
// produced by rand().
uint64_t MakeRand() {
uint32_t result = (rand() >> 8) & 0xffff;
result <<= 16;
result |= (rand() >> 8) & 0xffff;
return result;
}
// BfInsert() and BfFind() are like BloomFilter::{Insert,Find}, except they randomly
// disable AVX2 instructions half of the time. These are used for testing that AVX2
// machines and non-AVX2 machines produce compatible BloomFilters.
void BfInsert(BloomFilter& bf, uint32_t h) {
if (MakeRand() & 0x1) {
bf.Insert(h);
} else {
CpuInfo::TempDisable t1(CpuInfo::AVX2);
bf.Insert(h);
}
}
bool BfFind(BloomFilter& bf, uint32_t h) {
if (MakeRand() & 0x1) {
return bf.Find(h);
} else {
CpuInfo::TempDisable t1(CpuInfo::AVX2);
return bf.Find(h);
}
}
// Computes union of 'x' and 'y'. Computes twice with AVX enabled and disabled and
// verifies both produce the same result. 'success' is set to true if both union
// computations returned the same result and set to false otherwise.
TBloomFilter BfUnion(const BloomFilter& x, const BloomFilter& y, bool* success) {
TBloomFilter thrift_x, thrift_y;
BloomFilter::ToThrift(&x, &thrift_x);
BloomFilter::ToThrift(&y, &thrift_y);
BloomFilter::Or(thrift_x, &thrift_y);
{
CpuInfo::TempDisable t1(CpuInfo::AVX);
CpuInfo::TempDisable t2(CpuInfo::AVX2);
TBloomFilter thrift_x2, thrift_y2;
BloomFilter::ToThrift(&x, &thrift_x2);
BloomFilter::ToThrift(&y, &thrift_y2);
BloomFilter::Or(thrift_x2, &thrift_y2);
*success = thrift_y.directory == thrift_y2.directory;
}
return thrift_y;
}
} // namespace
namespace impala {
// Test that MaxNdv() and MinLogSpace() are dual
TEST(BloomFilter, MinSpaceMaxNdv) {
for (int log2fpp = -2; log2fpp >= -63; --log2fpp) {
const double fpp = pow(2, log2fpp);
for (int given_log_space = 8; given_log_space < 30; ++given_log_space) {
const size_t derived_ndv = BloomFilter::MaxNdv(given_log_space, fpp);
int derived_log_space = BloomFilter::MinLogSpace(derived_ndv, fpp);
EXPECT_EQ(derived_log_space, given_log_space) << "fpp: " << fpp
<< " derived_ndv: " << derived_ndv;
// If we lower the fpp, we need more space; if we raise it we need less.
derived_log_space = BloomFilter::MinLogSpace(derived_ndv, fpp / 2);
EXPECT_GE(derived_log_space, given_log_space) << "fpp: " << fpp
<< " derived_ndv: " << derived_ndv;
derived_log_space = BloomFilter::MinLogSpace(derived_ndv, fpp * 2);
EXPECT_LE(derived_log_space, given_log_space) << "fpp: " << fpp
<< " derived_ndv: " << derived_ndv;
}
for (size_t given_ndv = 1000; given_ndv < 1000 * 1000; given_ndv *= 3) {
const int derived_log_space = BloomFilter::MinLogSpace(given_ndv, fpp);
const size_t derived_ndv = BloomFilter::MaxNdv(derived_log_space, fpp);
// The max ndv is close to, but larger than, then ndv we asked for
EXPECT_LE(given_ndv, derived_ndv) << "fpp: " << fpp
<< " derived_log_space: " << derived_log_space;
EXPECT_GE(given_ndv * 2, derived_ndv)
<< "fpp: " << fpp << " derived_log_space: " << derived_log_space;
// Changing the fpp changes the ndv capacity in the expected direction.
size_t new_derived_ndv = BloomFilter::MaxNdv(derived_log_space, fpp / 2);
EXPECT_GE(derived_ndv, new_derived_ndv)
<< "fpp: " << fpp << " derived_log_space: " << derived_log_space;
new_derived_ndv = BloomFilter::MaxNdv(derived_log_space, fpp * 2);
EXPECT_LE(derived_ndv, new_derived_ndv)
<< "fpp: " << fpp << " derived_log_space: " << derived_log_space;
}
}
}
TEST(BloomFilter, MinSpaceEdgeCase) {
int min_space = BloomFilter::MinLogSpace(1, 0.75);
EXPECT_GE(min_space, 0) << "LogSpace should always be >= 0";
}
// Check that MinLogSpace() and FalsePositiveProb() are dual
TEST(BloomFilter, MinSpaceForFpp) {
for (size_t ndv = 10000; ndv < 100 * 1000 * 1000; ndv *= 1.01) {
for (double fpp = 0.1; fpp > pow(2, -20); fpp *= 0.99) { // NOLINT: loop on double
// When contructing a Bloom filter, we can request a particular fpp by calling
// MinLogSpace().
const int min_log_space = BloomFilter::MinLogSpace(ndv, fpp);
// However, at the resulting ndv and space, the expected fpp might be lower than
// the one that was requested.
double expected_fpp = BloomFilter::FalsePositiveProb(ndv, min_log_space);
EXPECT_LE(expected_fpp, fpp);
// The fpp we get might be much lower than the one we asked for. However, if the
// space were just one size smaller, the fpp we get would be larger than the one we
// asked for.
expected_fpp = BloomFilter::FalsePositiveProb(ndv, min_log_space - 1);
EXPECT_GE(expected_fpp, fpp);
// Therefore, the return value of MinLogSpace() is actually the minimum
// log space at which we can guarantee the requested fpp.
}
}
}
class BloomFilterTest : public testing::Test {
protected:
/// Temporary runtime environment for the BloomFilters.
unique_ptr<TestEnv> test_env_;
RuntimeState* runtime_state_;
ObjectPool pool_;
unique_ptr<MemTracker> tracker_;
unique_ptr<BufferPool::ClientHandle> buffer_pool_client_;
vector<BloomFilter*> bloom_filters_;
virtual void SetUp() {
int64_t min_page_size = 64; // Min filter size that we allocate in our tests.
int64_t buffer_bytes_limit = 4L * 1024 * 1024 * 1024;
test_env_.reset(new TestEnv());
test_env_->SetBufferPoolArgs(min_page_size, buffer_bytes_limit);
ASSERT_OK(test_env_->Init());
ASSERT_OK(test_env_->CreateQueryState(0, nullptr, &runtime_state_));
buffer_pool_client_.reset(new BufferPool::ClientHandle);
tracker_.reset(new MemTracker(-1, "client", runtime_state_->instance_mem_tracker()));
BufferPool* buffer_pool = test_env_->exec_env()->buffer_pool();
ASSERT_OK(buffer_pool->RegisterClient("", nullptr,
runtime_state_->instance_buffer_reservation(), tracker_.get(),
std::numeric_limits<int64>::max(), runtime_state_->runtime_profile(),
buffer_pool_client_.get()));
}
virtual void TearDown() {
for (BloomFilter* filter : bloom_filters_) filter->Close();
bloom_filters_.clear();
runtime_state_ = nullptr;
pool_.Clear();
test_env_->exec_env()->buffer_pool()->DeregisterClient(buffer_pool_client_.get());
buffer_pool_client_.reset();
tracker_.reset();
test_env_.reset();
}
BloomFilter* CreateBloomFilter(int log_bufferpool_space) {
int64_t filter_size = BloomFilter::GetExpectedMemoryUsed(log_bufferpool_space);
EXPECT_TRUE(buffer_pool_client_->IncreaseReservation(filter_size));
BloomFilter* bloom_filter = pool_.Add(new BloomFilter(buffer_pool_client_.get()));
EXPECT_OK(bloom_filter->Init(log_bufferpool_space));
bloom_filters_.push_back(bloom_filter);
EXPECT_NE(bloom_filter->GetBufferPoolSpaceUsed(), -1);
return bloom_filter;
}
BloomFilter* CreateBloomFilter(TBloomFilter t_filter) {
int64_t filter_size =
BloomFilter::GetExpectedMemoryUsed(t_filter.log_bufferpool_space);
EXPECT_TRUE(buffer_pool_client_->IncreaseReservation(filter_size));
BloomFilter* bloom_filter = pool_.Add(new BloomFilter(buffer_pool_client_.get()));
EXPECT_OK(bloom_filter->Init(t_filter));
bloom_filters_.push_back(bloom_filter);
EXPECT_NE(bloom_filter->GetBufferPoolSpaceUsed(), -1);
return bloom_filter;
}
};
// We can construct (and destruct) Bloom filters with different spaces.
TEST_F(BloomFilterTest, Constructor) {
for (int i = 1; i < 30; ++i) {
CreateBloomFilter(i);
}
}
// We can Insert() hashes into a Bloom filter with different spaces.
TEST_F(BloomFilterTest, Insert) {
srand(0);
for (int i = 13; i < 17; ++i) {
BloomFilter* bf = CreateBloomFilter(i);
for (int k = 0; k < (1 << 15); ++k) {
BfInsert(*bf, MakeRand());
}
}
}
// After Insert()ing something into a Bloom filter, it can be found again immediately.
TEST_F(BloomFilterTest, Find) {
srand(0);
for (int i = 13; i < 17; ++i) {
BloomFilter* bf = CreateBloomFilter(i);
for (int k = 0; k < (1 << 15); ++k) {
const uint64_t to_insert = MakeRand();
BfInsert(*bf, to_insert);
EXPECT_TRUE(BfFind(*bf, to_insert));
}
}
}
// After Insert()ing something into a Bloom filter, it can be found again much later.
TEST_F(BloomFilterTest, CumulativeFind) {
srand(0);
for (int i = 5; i < 11; ++i) {
std::vector<uint32_t> inserted;
BloomFilter* bf = CreateBloomFilter(i);
for (int k = 0; k < (1 << 10); ++k) {
const uint32_t to_insert = MakeRand();
inserted.push_back(to_insert);
BfInsert(*bf, to_insert);
for (int n = 0; n < inserted.size(); ++n) {
EXPECT_TRUE(BfFind(*bf, inserted[n]));
}
}
}
}
// The empirical false positives we find when looking for random items is with a constant
// factor of the false positive probability the Bloom filter was constructed for.
TEST_F(BloomFilterTest, FindInvalid) {
srand(0);
static const int find_limit = 1 << 20;
unordered_set<uint32_t> to_find;
while (to_find.size() < find_limit) {
to_find.insert(MakeRand());
}
static const int max_log_ndv = 19;
unordered_set<uint32_t> to_insert;
while (to_insert.size() < (1ull << max_log_ndv)) {
const auto candidate = MakeRand();
if (to_find.find(candidate) == to_find.end()) {
to_insert.insert(candidate);
}
}
vector<uint32_t> shuffled_insert(to_insert.begin(), to_insert.end());
for (int log_ndv = 12; log_ndv < max_log_ndv; ++log_ndv) {
for (int log_fpp = 4; log_fpp < 15; ++log_fpp) {
double fpp = 1.0 / (1 << log_fpp);
const size_t ndv = 1 << log_ndv;
const int log_bufferpool_space = BloomFilter::MinLogSpace(ndv, fpp);
BloomFilter* bf = CreateBloomFilter(log_bufferpool_space);
// Fill up a BF with exactly as much ndv as we planned for it:
for (size_t i = 0; i < ndv; ++i) {
BfInsert(*bf, shuffled_insert[i]);
}
int found = 0;
// Now we sample from the set of possible hashes, looking for hits.
for (const auto& i : to_find) {
found += BfFind(*bf, i);
}
EXPECT_LE(found, find_limit * fpp * 2)
<< "Too many false positives with -log2(fpp) = " << log_fpp;
// Because the space is rounded up to a power of 2, we might actually get a lower
// fpp than the one passed to MinLogSpace().
const double expected_fpp =
BloomFilter::FalsePositiveProb(ndv, log_bufferpool_space);
EXPECT_GE(found, find_limit * expected_fpp)
<< "Too few false positives with -log2(fpp) = " << log_fpp;
EXPECT_LE(found, find_limit * expected_fpp * 8)
<< "Too many false positives with -log2(fpp) = " << log_fpp;
}
}
}
TEST_F(BloomFilterTest, Thrift) {
BloomFilter* bf = CreateBloomFilter(BloomFilter::MinLogSpace(100, 0.01));
for (int i = 0; i < 10; ++i) BfInsert(*bf, i);
// Check no unexpected new false positives.
unordered_set<int> missing_ints;
for (int i = 11; i < 100; ++i) {
if (!BfFind(*bf, i)) missing_ints.insert(i);
}
TBloomFilter to_thrift;
BloomFilter::ToThrift(bf, &to_thrift);
EXPECT_EQ(to_thrift.always_true, false);
BloomFilter* from_thrift = CreateBloomFilter(to_thrift);
for (int i = 0; i < 10; ++i) ASSERT_TRUE(BfFind(*from_thrift, i));
for (int missing: missing_ints) ASSERT_FALSE(BfFind(*from_thrift, missing));
BloomFilter::ToThrift(NULL, &to_thrift);
EXPECT_EQ(to_thrift.always_true, true);
}
TEST_F(BloomFilterTest, ThriftOr) {
BloomFilter* bf1 = CreateBloomFilter(BloomFilter::MinLogSpace(100, 0.01));
BloomFilter* bf2 = CreateBloomFilter(BloomFilter::MinLogSpace(100, 0.01));
for (int i = 60; i < 80; ++i) BfInsert(*bf2, i);
for (int i = 0; i < 10; ++i) BfInsert(*bf1, i);
bool success;
BloomFilter *bf3 = CreateBloomFilter(BfUnion(*bf1, *bf2, &success));
ASSERT_TRUE(success) << "SIMD BloomFilter::Union error";
for (int i = 0; i < 10; ++i) ASSERT_TRUE(BfFind(*bf3, i)) << i;
for (int i = 60; i < 80; ++i) ASSERT_TRUE(BfFind(*bf3, i)) << i;
// Insert another value to aggregated BloomFilter.
for (int i = 11; i < 50; ++i) BfInsert(*bf3, i);
// Apply TBloomFilter back to BloomFilter and verify if aggregation was correct.
BloomFilter *bf4 = CreateBloomFilter(BfUnion(*bf1, *bf3, &success));
ASSERT_TRUE(success) << "SIMD BloomFilter::Union error";
for (int i = 11; i < 50; ++i) ASSERT_TRUE(BfFind(*bf4, i)) << i;
for (int i = 60; i < 80; ++i) ASSERT_TRUE(BfFind(*bf4, i)) << i;
ASSERT_FALSE(BfFind(*bf4, 81));
}
} // namespace impala