blob: 1a98804c9ea83cc431fba8f9964ae45251814bba [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <boost/scoped_ptr.hpp>
#include <stdio.h>
#include <stdlib.h>
#include <iostream>
#include <limits>
#include <vector>
#include "common/compiler-util.h"
#include "common/init.h"
#include "exec/hash-table.inline.h"
#include "exprs/scalar-expr.h"
#include "exprs/scalar-expr-evaluator.h"
#include "exprs/slot-ref.h"
#include "runtime/bufferpool/buffer-pool.h"
#include "runtime/bufferpool/reservation-tracker.h"
#include "runtime/mem-pool.h"
#include "runtime/mem-tracker.h"
#include "runtime/string-value.h"
#include "runtime/test-env.h"
#include "runtime/tuple-row.h"
#include "service/fe-support.h"
#include "testutil/gtest-util.h"
#include "util/cpu-info.h"
#include "util/runtime-profile-counters.h"
#include "util/test-info.h"
#include "common/names.h"
using namespace std;
using namespace boost;
namespace impala {
class HashTableTest : public testing::Test {
public:
HashTableTest() : mem_pool_(&tracker_) {}
protected:
/// Temporary runtime environment for the hash table.
scoped_ptr<TestEnv> test_env_;
RuntimeState* runtime_state_;
/// Hash tables and associated clients - automatically closed in TearDown().
vector<BufferPool::ClientHandle*> clients_;
vector<HashTable*> hash_tables_;
ObjectPool pool_;
/// A dummy MemTracker used for exprs and other things we don't need to have limits on.
MemTracker tracker_;
MemPool mem_pool_;
vector<ScalarExpr*> build_exprs_;
vector<ScalarExprEvaluator*> build_expr_evals_;
vector<ScalarExpr*> probe_exprs_;
vector<ScalarExprEvaluator*> probe_expr_evals_;
int next_query_id_ = 0;
virtual void SetUp() {
test_env_.reset(new TestEnv());
ASSERT_OK(test_env_->Init());
RowDescriptor desc;
// Not very easy to test complex tuple layouts so this test will use the
// simplest. The purpose of these tests is to exercise the hash map
// internals so a simple build/probe expr is fine.
ScalarExpr* build_expr = pool_.Add(new SlotRef(TYPE_INT, 1, true /* nullable */));
ASSERT_OK(build_expr->Init(desc, true, nullptr));
build_exprs_.push_back(build_expr);
ASSERT_OK(ScalarExprEvaluator::Create(build_exprs_, nullptr, &pool_, &mem_pool_,
&mem_pool_, &build_expr_evals_));
ASSERT_OK(ScalarExprEvaluator::Open(build_expr_evals_, nullptr));
ScalarExpr* probe_expr = pool_.Add(new SlotRef(TYPE_INT, 1, true /* nullable */));
ASSERT_OK(probe_expr->Init(desc, true, nullptr));
probe_exprs_.push_back(probe_expr);
ASSERT_OK(ScalarExprEvaluator::Create(probe_exprs_, nullptr, &pool_, &mem_pool_,
&mem_pool_, &probe_expr_evals_));
ASSERT_OK(ScalarExprEvaluator::Open(probe_expr_evals_, nullptr));
CreateTestEnv();
}
virtual void TearDown() {
ScalarExprEvaluator::Close(build_expr_evals_, nullptr);
ScalarExprEvaluator::Close(probe_expr_evals_, nullptr);
ScalarExpr::Close(build_exprs_);
ScalarExpr::Close(probe_exprs_);
for (HashTable* hash_table : hash_tables_) hash_table->Close();
hash_tables_.clear();
for (BufferPool::ClientHandle* client : clients_) {
test_env_->exec_env()->buffer_pool()->DeregisterClient(client);
}
clients_.clear();
runtime_state_ = nullptr;
test_env_.reset();
mem_pool_.FreeAll();
pool_.Clear();
}
/// Initialize test_env_ and runtime_state_ with the given page size and capacity
/// for the given number of pages. If test_env_ was already created, then re-creates it.
void CreateTestEnv(int64_t min_page_size = 64 * 1024,
int64_t buffer_bytes_limit = 4L * 1024 * 1024 * 1024) {
test_env_.reset(new TestEnv());
test_env_->SetBufferPoolArgs(min_page_size, buffer_bytes_limit);
ASSERT_OK(test_env_->Init());
TQueryOptions query_options;
query_options.__set_default_spillable_buffer_size(min_page_size);
query_options.__set_min_spillable_buffer_size(min_page_size);
query_options.__set_buffer_pool_limit(buffer_bytes_limit);
ASSERT_OK(test_env_->CreateQueryState(0, &query_options, &runtime_state_));
}
TupleRow* CreateTupleRow(int32_t val) {
uint8_t* tuple_row_mem = mem_pool_.Allocate(sizeof(int32_t*));
Tuple* tuple_mem = Tuple::Create(sizeof(char) + sizeof(int32_t), &mem_pool_);
*reinterpret_cast<int32_t *>(tuple_mem->GetSlot(1)) = val;
tuple_mem->SetNotNull(NullIndicatorOffset(0,1));
TupleRow* row = reinterpret_cast<TupleRow*>(tuple_row_mem);
row->SetTuple(0, tuple_mem);
return row;
}
TupleRow* CreateNullTupleRow() {
uint8_t* tuple_row_mem = mem_pool_.Allocate(sizeof(int32_t*));
Tuple* tuple_mem = Tuple::Create(sizeof(int32_t), &mem_pool_);
tuple_mem->SetNull(NullIndicatorOffset(0,1));
TupleRow* row = reinterpret_cast<TupleRow*>(tuple_row_mem);
row->SetTuple(0, tuple_mem);
return row;
}
// Wrapper to call private methods on HashTable
// TODO: understand google testing, there must be a more natural way to do this
Status ResizeTable(
HashTable* table, int64_t new_size, HashTableCtx* ht_ctx, bool* success) {
return table->ResizeBuckets(new_size, ht_ctx, success);
}
// Do a full table scan on table. All values should be between [min,max). If
// all_unique, then each key(int value) should only appear once. Results are
// stored in results, indexed by the key. Results must have been preallocated to
// be at least max size.
void FullScan(HashTable* table, HashTableCtx* ht_ctx, int min, int max,
bool all_unique, TupleRow** results, TupleRow** expected) {
HashTable::Iterator iter = table->Begin(ht_ctx);
while (!iter.AtEnd()) {
TupleRow* row = iter.GetRow();
int32_t val = *reinterpret_cast<int32_t*>(build_expr_evals_[0]->GetValue(row));
EXPECT_GE(val, min);
EXPECT_LT(val, max);
if (all_unique) {
EXPECT_TRUE(results[val] == nullptr);
}
EXPECT_EQ(row->GetTuple(0), expected[val]->GetTuple(0));
results[val] = row;
iter.Next();
}
}
// Validate that probe_row evaluates overs probe_exprs is equal to build_row
// evaluated over build_exprs
void ValidateMatch(TupleRow* probe_row, TupleRow* build_row) {
EXPECT_TRUE(probe_row != build_row);
int32_t build_val =
*reinterpret_cast<int32_t*>(build_expr_evals_[0]->GetValue(probe_row));
int32_t probe_val =
*reinterpret_cast<int32_t*>(probe_expr_evals_[0]->GetValue(build_row));
EXPECT_EQ(build_val, probe_val);
}
struct ProbeTestData {
TupleRow* probe_row;
vector<TupleRow*> expected_build_rows;
};
void ProbeTest(HashTable* table, HashTableCtx* ht_ctx,
ProbeTestData* data, int num_data, bool scan) {
for (int i = 0; i < num_data; ++i) {
TupleRow* row = data[i].probe_row;
HashTable::Iterator iter;
if (ht_ctx->EvalAndHashProbe(row)) continue;
iter = table->FindProbeRow(ht_ctx);
if (data[i].expected_build_rows.size() == 0) {
EXPECT_TRUE(iter.AtEnd());
} else {
if (scan) {
map<TupleRow*, bool> matched;
while (!iter.AtEnd()) {
EXPECT_EQ(matched.find(iter.GetRow()), matched.end());
matched[iter.GetRow()] = true;
iter.Next();
}
EXPECT_EQ(matched.size(), data[i].expected_build_rows.size());
for (int j = 0; i < data[j].expected_build_rows.size(); ++j) {
EXPECT_TRUE(matched[data[i].expected_build_rows[j]]);
}
} else {
EXPECT_EQ(data[i].expected_build_rows.size(), 1);
EXPECT_EQ(data[i].expected_build_rows[0]->GetTuple(0),
iter.GetRow()->GetTuple(0));
ValidateMatch(row, iter.GetRow());
}
}
}
}
/// Construct hash table and buffer pool client.
/// Returns true if HashTable::Init() was successful. Created objects
/// and resources (e.g. reservations) are automatically freed in TearDown().
bool CreateHashTable(bool quadratic, int64_t initial_num_buckets, HashTable** table,
int64_t block_size = 8 * 1024 * 1024, int max_num_blocks = 100,
int initial_reserved_blocks = 10, int64_t suballocator_buffer_len = 64 * 1024) {
BufferPool* buffer_pool = test_env_->exec_env()->buffer_pool();
RuntimeProfile* profile = RuntimeProfile::Create(&pool_, "ht");
// Set up memory tracking for the hash table.
MemTracker* client_tracker =
pool_.Add(new MemTracker(-1, "client", runtime_state_->instance_mem_tracker()));
int64_t initial_reservation_bytes = block_size * initial_reserved_blocks;
int64_t max_reservation_bytes = block_size * max_num_blocks;
// Set up the memory allocator.
BufferPool::ClientHandle* client = pool_.Add(new BufferPool::ClientHandle);
clients_.push_back(client);
EXPECT_OK(buffer_pool->RegisterClient("", nullptr,
runtime_state_->instance_buffer_reservation(), client_tracker,
max_reservation_bytes, profile, client));
EXPECT_TRUE(client->IncreaseReservation(initial_reservation_bytes));
Suballocator* allocator =
pool_.Add(new Suballocator(buffer_pool, client, suballocator_buffer_len));
// Initial_num_buckets must be a power of two.
EXPECT_EQ(initial_num_buckets, BitUtil::RoundUpToPowerOfTwo(initial_num_buckets));
int64_t max_num_buckets = 1L << 31;
*table = pool_.Add(new HashTable(
quadratic, allocator, true, 1, nullptr, max_num_buckets, initial_num_buckets));
hash_tables_.push_back(*table);
bool success;
Status status = (*table)->Init(&success);
EXPECT_OK(status);
return status.ok() && success;
}
// Constructs and closes a hash table.
void SetupTest(bool quadratic, int64_t initial_num_buckets, bool too_big) {
TupleRow* build_row1 = CreateTupleRow(1);
TupleRow* build_row2 = CreateTupleRow(2);
TupleRow* probe_row3 = CreateTupleRow(3);
TupleRow* probe_row4 = CreateTupleRow(4);
int32_t* val_row1 =
reinterpret_cast<int32_t*>(build_expr_evals_[0]->GetValue(build_row1));
EXPECT_EQ(*val_row1, 1);
int32_t* val_row2 =
reinterpret_cast<int32_t*>(build_expr_evals_[0]->GetValue(build_row2));
EXPECT_EQ(*val_row2, 2);
int32_t* val_row3 =
reinterpret_cast<int32_t*>(probe_expr_evals_[0]->GetValue(probe_row3));
EXPECT_EQ(*val_row3, 3);
int32_t* val_row4 =
reinterpret_cast<int32_t*>(probe_expr_evals_[0]->GetValue(probe_row4));
EXPECT_EQ(*val_row4, 4);
// Create and close the hash table.
HashTable* hash_table;
bool initialized = CreateHashTable(quadratic, initial_num_buckets, &hash_table);
EXPECT_EQ(too_big, !initialized);
if (initialized && initial_num_buckets > 0) {
EXPECT_NE(hash_table->ByteSize(), 0);
}
}
// IMPALA-2897: Build rows that are equivalent (where nullptrs are counted as equivalent)
// should not occupy distinct buckets.
void NullBuildRowTest() {
TupleRow* build_rows[2];
for (int i = 0; i < 2; ++i) build_rows[i] = CreateNullTupleRow();
// Create the hash table and insert the build rows
HashTable* hash_table;
ASSERT_TRUE(CreateHashTable(true, 1024, &hash_table));
scoped_ptr<HashTableCtx> ht_ctx;
EXPECT_OK(HashTableCtx::Create(&pool_, runtime_state_,
build_exprs_, probe_exprs_, true /* stores_nulls_ */,
vector<bool>(build_exprs_.size(), false), 1, 0, 1, &mem_pool_, &mem_pool_,
&mem_pool_, &ht_ctx));
EXPECT_OK(ht_ctx->Open(runtime_state_));
for (int i = 0; i < 2; ++i) {
if (!ht_ctx->EvalAndHashBuild(build_rows[i])) continue;
BufferedTupleStream::FlatRowPtr dummy_flat_row = nullptr;
EXPECT_TRUE(hash_table->stores_tuples_);
Status status;
bool inserted =
hash_table->Insert(ht_ctx.get(), dummy_flat_row, build_rows[i], &status);
EXPECT_TRUE(inserted);
ASSERT_OK(status);
}
EXPECT_EQ(hash_table->num_buckets() - hash_table->EmptyBuckets(), 1);
ht_ctx->Close(runtime_state_);
}
// This test inserts the build rows [0->5) to hash table. It validates that they
// are all there using a full table scan. It also validates that Find() is correct
// testing for probe rows that are both there and not.
// The hash table is resized a few times and the scans/finds are tested again.
void BasicTest(bool quadratic, int initial_num_buckets) {
TupleRow* build_rows[5];
TupleRow* scan_rows[5] = {0};
for (int i = 0; i < 5; ++i) build_rows[i] = CreateTupleRow(i);
ProbeTestData probe_rows[10];
for (int i = 0; i < 10; ++i) {
probe_rows[i].probe_row = CreateTupleRow(i);
if (i < 5) probe_rows[i].expected_build_rows.push_back(build_rows[i]);
}
// Create the hash table and insert the build rows
HashTable* hash_table;
ASSERT_TRUE(CreateHashTable(quadratic, initial_num_buckets, &hash_table));
scoped_ptr<HashTableCtx> ht_ctx;
Status status = HashTableCtx::Create(&pool_, runtime_state_, build_exprs_,
probe_exprs_, false /* !stores_nulls_ */,
vector<bool>(build_exprs_.size(), false), 1, 0, 1, &mem_pool_, &mem_pool_,
&mem_pool_, &ht_ctx);
EXPECT_OK(status);
EXPECT_OK(ht_ctx->Open(runtime_state_));
bool success;
EXPECT_OK(hash_table->CheckAndResize(5, ht_ctx.get(), &success));
ASSERT_TRUE(success);
for (int i = 0; i < 5; ++i) {
if (!ht_ctx->EvalAndHashBuild(build_rows[i])) continue;
BufferedTupleStream::FlatRowPtr dummy_flat_row = nullptr;
EXPECT_TRUE(hash_table->stores_tuples_);
bool inserted =
hash_table->Insert(ht_ctx.get(), dummy_flat_row, build_rows[i], &status);
EXPECT_TRUE(inserted);
ASSERT_OK(status);
}
EXPECT_EQ(hash_table->size(), 5);
// Do a full table scan and validate returned pointers
FullScan(hash_table, ht_ctx.get(), 0, 5, true, scan_rows, build_rows);
ProbeTest(hash_table, ht_ctx.get(), probe_rows, 10, false);
// Double the size of the hash table and scan again.
EXPECT_OK(ResizeTable(hash_table, 2048, ht_ctx.get(), &success));
EXPECT_TRUE(success);
EXPECT_EQ(hash_table->num_buckets(), 2048);
EXPECT_EQ(hash_table->size(), 5);
memset(scan_rows, 0, sizeof(scan_rows));
FullScan(hash_table, ht_ctx.get(), 0, 5, true, scan_rows, build_rows);
ProbeTest(hash_table, ht_ctx.get(), probe_rows, 10, false);
// Try to shrink and scan again.
EXPECT_OK(ResizeTable(hash_table, 64, ht_ctx.get(), &success));
EXPECT_TRUE(success);
EXPECT_EQ(hash_table->num_buckets(), 64);
EXPECT_EQ(hash_table->size(), 5);
memset(scan_rows, 0, sizeof(scan_rows));
FullScan(hash_table, ht_ctx.get(), 0, 5, true, scan_rows, build_rows);
ProbeTest(hash_table, ht_ctx.get(), probe_rows, 10, false);
// Resize to 8, which is the smallest value to fit the number of filled buckets.
EXPECT_OK(ResizeTable(hash_table, 8, ht_ctx.get(), &success));
EXPECT_TRUE(success);
EXPECT_EQ(hash_table->num_buckets(), 8);
EXPECT_EQ(hash_table->size(), 5);
memset(scan_rows, 0, sizeof(scan_rows));
FullScan(hash_table, ht_ctx.get(), 0, 5, true, scan_rows, build_rows);
ProbeTest(hash_table, ht_ctx.get(), probe_rows, 10, false);
ht_ctx->Close(runtime_state_);
}
void ScanTest(
bool quadratic, int initial_size, int rows_to_insert, int additional_rows) {
HashTable* hash_table;
ASSERT_TRUE(CreateHashTable(quadratic, initial_size, &hash_table));
int total_rows = rows_to_insert + additional_rows;
scoped_ptr<HashTableCtx> ht_ctx;
Status status = HashTableCtx::Create(&pool_, runtime_state_, build_exprs_,
probe_exprs_, false /* !stores_nulls_ */,
vector<bool>(build_exprs_.size(), false), 1, 0, 1, &mem_pool_, &mem_pool_,
&mem_pool_, &ht_ctx);
EXPECT_OK(status);
EXPECT_OK(ht_ctx->Open(runtime_state_));
// Add 1 row with val 1, 2 with val 2, etc.
bool success;
vector<TupleRow*> build_rows;
ProbeTestData* probe_rows = new ProbeTestData[total_rows];
probe_rows[0].probe_row = CreateTupleRow(0);
for (int val = 1; val <= rows_to_insert; ++val) {
EXPECT_OK(hash_table->CheckAndResize(val, ht_ctx.get(), &success));
EXPECT_TRUE(success) << " failed to resize: " << val;
probe_rows[val].probe_row = CreateTupleRow(val);
for (int i = 0; i < val; ++i) {
TupleRow* row = CreateTupleRow(val);
if (!ht_ctx->EvalAndHashBuild(row)) continue;
BufferedTupleStream::FlatRowPtr dummy_flat_row = nullptr;
EXPECT_TRUE(hash_table->stores_tuples_);
ASSERT_TRUE(hash_table->Insert(ht_ctx.get(), dummy_flat_row, row, &status));
ASSERT_OK(status);
build_rows.push_back(row);
probe_rows[val].expected_build_rows.push_back(row);
}
}
// Add some more probe rows that aren't there.
for (int val = rows_to_insert; val < rows_to_insert + additional_rows; ++val) {
probe_rows[val].probe_row = CreateTupleRow(val);
}
// Test that all the builds were found.
ProbeTest(hash_table, ht_ctx.get(), probe_rows, total_rows, true);
// Resize and try again.
int target_size = BitUtil::RoundUpToPowerOfTwo(2 * total_rows);
EXPECT_OK(ResizeTable(hash_table, target_size, ht_ctx.get(), &success));
EXPECT_TRUE(success);
EXPECT_EQ(hash_table->num_buckets(), target_size);
ProbeTest(hash_table, ht_ctx.get(), probe_rows, total_rows, true);
target_size = BitUtil::RoundUpToPowerOfTwo(total_rows + 1);
EXPECT_OK(ResizeTable(hash_table, target_size, ht_ctx.get(), &success));
EXPECT_TRUE(success);
EXPECT_EQ(hash_table->num_buckets(), target_size);
ProbeTest(hash_table, ht_ctx.get(), probe_rows, total_rows, true);
delete [] probe_rows;
ht_ctx->Close(runtime_state_);
}
// This test continues adding tuples to the hash table and exercises the resize code
// paths.
void GrowTableTest(bool quadratic) {
uint64_t num_to_add = 4;
int expected_size = 0;
// Need enough memory for two hash table bucket directories during resize.
const int64_t mem_limit_mb = 128 + 64;
HashTable* hash_table;
ASSERT_TRUE(
CreateHashTable(quadratic, num_to_add, &hash_table, 1024 * 1024, mem_limit_mb));
scoped_ptr<HashTableCtx> ht_ctx;
Status status = HashTableCtx::Create(&pool_, runtime_state_, build_exprs_,
probe_exprs_, false /* !stores_nulls_ */,
vector<bool>(build_exprs_.size(), false), 1, 0, 1, &mem_pool_, &mem_pool_,
&mem_pool_, &ht_ctx);
EXPECT_OK(status);
// Inserts num_to_add + (num_to_add^2) + (num_to_add^4) + ... + (num_to_add^20)
// entries. When num_to_add == 4, then the total number of inserts is 4194300.
int build_row_val = 0;
for (int i = 0; i < 20; ++i) {
bool success;
EXPECT_OK(hash_table->CheckAndResize(num_to_add, ht_ctx.get(), &success));
EXPECT_TRUE(success) << " failed to resize: " << num_to_add << "\n"
<< tracker_.LogUsage(MemTracker::UNLIMITED_DEPTH) << "\n"
<< clients_.back()->DebugString();
for (int j = 0; j < num_to_add; ++build_row_val, ++j) {
TupleRow* row = CreateTupleRow(build_row_val);
if (!ht_ctx->EvalAndHashBuild(row)) continue;
BufferedTupleStream::FlatRowPtr dummy_flat_row = nullptr;
EXPECT_TRUE(hash_table->stores_tuples_);
bool inserted = hash_table->Insert(ht_ctx.get(), dummy_flat_row, row, &status);
ASSERT_OK(status);
if (!inserted) goto done_inserting;
}
expected_size += num_to_add;
num_to_add *= 2;
}
done_inserting:
EXPECT_EQ(hash_table->size(), 4194300);
// The next allocation should put us over the limit, since we'll need 128MB for
// the old buckets and 256MB for the new buckets.
bool success;
EXPECT_OK(hash_table->CheckAndResize(num_to_add * 2, ht_ctx.get(), &success));
EXPECT_FALSE(success);
// Validate that we can find the entries before we went over the limit
for (int i = 0; i < expected_size * 5; i += 100000) {
TupleRow* probe_row = CreateTupleRow(i);
if (!ht_ctx->EvalAndHashProbe(probe_row)) continue;
HashTable::Iterator iter = hash_table->FindProbeRow(ht_ctx.get());
if (i < hash_table->size()) {
EXPECT_TRUE(!iter.AtEnd()) << " i: " << i;
ValidateMatch(probe_row, iter.GetRow());
} else {
EXPECT_TRUE(iter.AtEnd()) << " i: " << i;
}
}
// Insert duplicates to also hit OOM.
int64_t num_duplicates_inserted = 0;
const int DUPLICATE_VAL = 1234;
while (true) {
TupleRow* duplicate_row = CreateTupleRow(DUPLICATE_VAL);
if (!ht_ctx->EvalAndHashBuild(duplicate_row)) continue;
BufferedTupleStream::FlatRowPtr dummy_flat_row = nullptr;
bool inserted =
hash_table->Insert(ht_ctx.get(), dummy_flat_row, duplicate_row, &status);
ASSERT_OK(status);
if (!inserted) break;
++num_duplicates_inserted;
}
// Check that the duplicates that we successfully inserted are all present.
TupleRow* duplicate_row = CreateTupleRow(DUPLICATE_VAL);
ASSERT_TRUE(ht_ctx->EvalAndHashProbe(duplicate_row));
HashTable::Iterator iter = hash_table->FindProbeRow(ht_ctx.get());
ValidateMatch(duplicate_row, iter.GetRow());
for (int64_t i = 0; i < num_duplicates_inserted; ++i) {
ASSERT_FALSE(iter.AtEnd());
iter.NextDuplicate();
ValidateMatch(duplicate_row, iter.GetRow());
}
iter.NextDuplicate();
EXPECT_TRUE(iter.AtEnd());
ht_ctx->Close(runtime_state_);
}
// This test inserts and probes as many elements as the size of the hash table without
// calling resize. All the inserts and probes are expected to succeed, because there is
// enough space in the hash table (it is also expected to be slow). It also expects that
// a probe for a N+1 element will return BUCKET_NOT_FOUND.
void InsertFullTest(bool quadratic, int table_size) {
HashTable* hash_table;
ASSERT_TRUE(CreateHashTable(quadratic, table_size, &hash_table));
EXPECT_EQ(hash_table->EmptyBuckets(), table_size);
scoped_ptr<HashTableCtx> ht_ctx;
Status status = HashTableCtx::Create(&pool_, runtime_state_, build_exprs_,
probe_exprs_, false /* !stores_nulls_ */,
vector<bool>(build_exprs_.size(), false), 1, 0, 1, &mem_pool_,
&mem_pool_, &mem_pool_, &ht_ctx);
EXPECT_OK(status);
// Insert and probe table_size different tuples. All of them are expected to be
// successfully inserted and probed.
uint32_t hash = 0;
HashTable::Iterator iter;
bool found;
for (int build_row_val = 0; build_row_val < table_size; ++build_row_val) {
TupleRow* row = CreateTupleRow(build_row_val);
bool passes = ht_ctx->EvalAndHashBuild(row);
hash = ht_ctx->expr_values_cache()->CurExprValuesHash();
EXPECT_TRUE(passes);
// Insert using both Insert() and FindBucket() methods.
if (build_row_val % 2 == 0) {
BufferedTupleStream::FlatRowPtr dummy_flat_row = nullptr;
EXPECT_TRUE(hash_table->stores_tuples_);
bool inserted = hash_table->Insert(ht_ctx.get(), dummy_flat_row, row, &status);
EXPECT_TRUE(inserted);
ASSERT_OK(status);
} else {
iter = hash_table->FindBuildRowBucket(ht_ctx.get(), &found);
EXPECT_FALSE(iter.AtEnd());
EXPECT_FALSE(found);
iter.SetTuple(row->GetTuple(0), hash);
}
EXPECT_EQ(hash_table->EmptyBuckets(), table_size - build_row_val - 1);
passes = ht_ctx->EvalAndHashProbe(row);
(void)ht_ctx->expr_values_cache()->CurExprValuesHash();
EXPECT_TRUE(passes);
iter = hash_table->FindProbeRow(ht_ctx.get());
EXPECT_FALSE(iter.AtEnd());
EXPECT_EQ(row->GetTuple(0), iter.GetTuple());
iter = hash_table->FindBuildRowBucket(ht_ctx.get(), &found);
EXPECT_FALSE(iter.AtEnd());
EXPECT_TRUE(found);
EXPECT_EQ(row->GetTuple(0), iter.GetTuple());
}
// Probe for a tuple that does not exist. This should exercise the probe of a full
// hash table code path.
EXPECT_EQ(hash_table->EmptyBuckets(), 0);
TupleRow* probe_row = CreateTupleRow(table_size);
bool passes = ht_ctx->EvalAndHashProbe(probe_row);
EXPECT_TRUE(passes);
iter = hash_table->FindProbeRow(ht_ctx.get());
EXPECT_TRUE(iter.AtEnd());
// Since hash_table is full, FindBucket cannot find an empty bucket, so returns End().
iter = hash_table->FindBuildRowBucket(ht_ctx.get(), &found);
EXPECT_TRUE(iter.AtEnd());
EXPECT_FALSE(found);
ht_ctx->Close(runtime_state_);
}
// This test makes sure we can tolerate the low memory case where we do not have enough
// memory to allocate the array of buckets for the hash table.
void VeryLowMemTest(bool quadratic) {
const int64_t block_size = 2 * 1024;
const int max_num_blocks = 1;
const int table_size = 1024;
CreateTestEnv(block_size, block_size * max_num_blocks);
HashTable* hash_table;
ASSERT_FALSE(CreateHashTable(
quadratic, table_size, &hash_table, block_size, max_num_blocks, 0, 1024));
scoped_ptr<HashTableCtx> ht_ctx;
Status status = HashTableCtx::Create(&pool_, runtime_state_, build_exprs_,
probe_exprs_, false /* !stores_nulls_ */, vector<bool>(build_exprs_.size(), false), 1, 0, 1,
&mem_pool_, &mem_pool_, &mem_pool_, &ht_ctx);
EXPECT_OK(status);
HashTable::Iterator iter = hash_table->Begin(ht_ctx.get());
EXPECT_TRUE(iter.AtEnd());
ht_ctx->Close(runtime_state_);
}
};
TEST_F(HashTableTest, LinearSetupTest) {
SetupTest(false, 1, false);
SetupTest(false, 1024, false);
SetupTest(false, 65536, false);
// Regression test for IMPALA-2065. Trying to init a hash table with large (>2^31)
// number of buckets.
SetupTest(false, 4294967296, true); // 2^32
}
TEST_F(HashTableTest, QuadraticSetupTest) {
SetupTest(true, 1, false);
SetupTest(true, 1024, false);
SetupTest(true, 65536, false);
// Regression test for IMPALA-2065. Trying to init a hash table with large (>2^31)
// number of buckets.
SetupTest(true, 4294967296, true); // 2^32
}
TEST_F(HashTableTest, NullBuildRowTest) {
NullBuildRowTest();
}
TEST_F(HashTableTest, LinearBasicTest) {
BasicTest(false, 1);
BasicTest(false, 1024);
BasicTest(false, 65536);
}
TEST_F(HashTableTest, QuadraticBasicTest) {
BasicTest(true, 1);
BasicTest(true, 1024);
BasicTest(true, 65536);
}
// This test makes sure we can scan ranges of buckets.
TEST_F(HashTableTest, LinearScanTest) {
ScanTest(false, 1, 10, 5);
ScanTest(false, 1024, 1000, 5);
ScanTest(false, 1024, 1000, 500);
}
TEST_F(HashTableTest, QuadraticScanTest) {
ScanTest(true, 1, 10, 5);
ScanTest(true, 1024, 1000, 5);
ScanTest(true, 1024, 1000, 500);
}
TEST_F(HashTableTest, LinearGrowTableTest) {
GrowTableTest(false);
}
TEST_F(HashTableTest, QuadraticGrowTableTest) {
GrowTableTest(true);
}
TEST_F(HashTableTest, LinearInsertFullTest) {
InsertFullTest(false, 1);
InsertFullTest(false, 4);
InsertFullTest(false, 64);
InsertFullTest(false, 1024);
InsertFullTest(false, 65536);
}
TEST_F(HashTableTest, QuadraticInsertFullTest) {
InsertFullTest(true, 1);
InsertFullTest(true, 4);
InsertFullTest(true, 64);
InsertFullTest(true, 1024);
InsertFullTest(true, 65536);
}
// Test that hashing empty string updates hash value.
TEST_F(HashTableTest, HashEmpty) {
scoped_ptr<HashTableCtx> ht_ctx;
Status status = HashTableCtx::Create(&pool_, runtime_state_, build_exprs_,
probe_exprs_, false /* !stores_nulls_ */,
vector<bool>(build_exprs_.size(), false), 1, 2, 1, &mem_pool_, &mem_pool_,
&mem_pool_, &ht_ctx);
EXPECT_OK(status);
EXPECT_OK(ht_ctx->Open(runtime_state_));
uint32_t seed = 9999;
ht_ctx->set_level(0);
EXPECT_NE(seed, ht_ctx->Hash(nullptr, 0, seed));
// TODO: level 0 uses CRC hash, which only swaps bytes around on empty input.
// EXPECT_NE(seed, ht_ctx->Hash(nullptr, 0, ht_ctx->Hash(nullptr, 0, seed)));
ht_ctx->set_level(1);
EXPECT_NE(seed, ht_ctx->Hash(nullptr, 0, seed));
EXPECT_NE(seed, ht_ctx->Hash(nullptr, 0, ht_ctx->Hash(nullptr, 0, seed)));
ht_ctx->Close(runtime_state_);
}
TEST_F(HashTableTest, VeryLowMemTest) {
VeryLowMemTest(true);
VeryLowMemTest(false);
}
}