blob: 54f8b811dc4bc0cac4e9f1a2d607f8bf2ddc08a9 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/hash-table.h"
namespace impala {
inline bool HashTableCtx::EvalAndHashBuild(const TupleRow* row) {
uint8_t* expr_values = expr_values_cache_.cur_expr_values();
uint8_t* expr_values_null = expr_values_cache_.cur_expr_values_null();
bool has_null = EvalBuildRow(row, expr_values, expr_values_null);
if (!stores_nulls() && has_null) return false;
expr_values_cache_.SetCurExprValuesHash(HashRow(expr_values, expr_values_null));
return true;
inline bool HashTableCtx::EvalAndHashProbe(const TupleRow* row) {
uint8_t* expr_values = expr_values_cache_.cur_expr_values();
uint8_t* expr_values_null = expr_values_cache_.cur_expr_values_null();
bool has_null = EvalProbeRow(row, expr_values, expr_values_null);
if (has_null && !(stores_nulls() && finds_some_nulls())) return false;
expr_values_cache_.SetCurExprValuesHash(HashRow(expr_values, expr_values_null));
return true;
inline void HashTableCtx::ExprValuesCache::NextRow() {
cur_expr_values_ += expr_values_bytes_per_row_;
cur_expr_values_null_ += num_exprs_;
DCHECK_LE(cur_expr_values_hash_ - expr_values_hash_array_.get(), capacity_);
template <bool INCLUSIVE_EQUALITY, bool COMPARE_ROW, HashTable::BucketType TYPE>
inline int64_t HashTable::Probe(Bucket* buckets, uint32_t* hash_array,
int64_t num_buckets, HashTableCtx* __restrict__ ht_ctx, uint32_t hash, bool* found,
BucketData* bd) {
DCHECK(ht_ctx != nullptr);
DCHECK(buckets != nullptr);
DCHECK_GT(num_buckets, 0);
*found = false;
int64_t bucket_idx = hash & (num_buckets - 1);
// In case of linear probing it counts the total number of steps for statistics and
// for knowing when to exit the loop (e.g. by capping the total travel length). In case
// of quadratic probing it is also used for calculating the length of the next jump.
int64_t step = 0;
do {
Bucket* bucket = &buckets[bucket_idx];
if (LIKELY(!bucket->IsFilled())) return bucket_idx;
if (hash == hash_array[bucket_idx]) {
&& ht_ctx->Equals<INCLUSIVE_EQUALITY>(
GetRow<TYPE>(bucket, ht_ctx->scratch_row_, bd))) {
*found = true;
return bucket_idx;
// Row equality failed, or not performed. This is a hash collision. Continue
// searching.
// Move to the next bucket.
if (quadratic_probing()) {
// The i-th probe location is idx = (hash + (step * (step + 1)) / 2) mod
// num_buckets. This gives num_buckets unique idxs (between 0 and N-1) when
// num_buckets is a power of 2.
bucket_idx = (bucket_idx + step) & (num_buckets - 1);
} else {
// Linear probing
bucket_idx = (bucket_idx + 1) & (num_buckets - 1);
} while (LIKELY(step < num_buckets));
ht_ctx->travel_length_ += step;
DCHECK_EQ(num_filled_buckets_, num_buckets)
<< "Probing of a non-full table "
<< "failed: " << quadratic_probing() << " " << hash;
return Iterator::BUCKET_NOT_FOUND;
inline HashTable::Bucket* HashTable::InsertInternal(
HashTableCtx* __restrict__ ht_ctx, Status* status) {
bool found = false;
uint32_t hash = ht_ctx->expr_values_cache()->CurExprValuesHash();
BucketData bd;
int64_t bucket_idx =
Probe<true, true>(buckets_, hash_array_, num_buckets_, ht_ctx, hash, &found, &bd);
DCHECK_NE(bucket_idx, Iterator::BUCKET_NOT_FOUND);
if (found) {
// We need to insert a duplicate node, note that this may fail to allocate memory.
DuplicateNode* new_node = InsertDuplicateNode(bucket_idx, status, &bd);
if (UNLIKELY(new_node == NULL)) return NULL;
} else {
PrepareBucketForInsert(bucket_idx, hash);
return &buckets_[bucket_idx];
inline bool HashTable::Insert(HashTableCtx* __restrict__ ht_ctx,
BufferedTupleStream::FlatRowPtr flat_row, TupleRow* row, Status* status) {
Bucket* bucket = InsertInternal(ht_ctx, status);
if (UNLIKELY(bucket == NULL)) return false;
// If successful insert, update the contents of the newly inserted entry with 'idx'.
if (bucket->HasDuplicates()) {
DuplicateNode* node = bucket->GetDuplicate();
if (UNLIKELY(node == NULL)) return false;
if (stores_tuples()) {
node->htdata.tuple = row->GetTuple(0);
} else {
node->htdata.flat_row = flat_row;
} else {
if (stores_tuples()) {
Tuple* tuple = row->GetTuple(0);
} else {
return true;
template <const bool READ>
inline void HashTable::PrefetchBucket(uint32_t hash) {
int64_t bucket_idx = hash & (num_buckets_ - 1);
// Two optional arguments:
// 'rw': 1 means the memory access is write
// 'locality': 0-3. 0 means no temporal locality. 3 means high temporal locality.
// On x86, they map to instructions prefetchnta and prefetch{2-0} respectively.
// TODO: Reconsider the locality level with smaller prefetch batch size.
__builtin_prefetch(&buckets_[bucket_idx], READ ? 0 : 1, 1);
__builtin_prefetch(&hash_array_[bucket_idx], READ ? 0 : 1, 1);
inline HashTable::Iterator HashTable::FindProbeRow(HashTableCtx* __restrict__ ht_ctx) {
bool found = false;
uint32_t hash = ht_ctx->expr_values_cache()->CurExprValuesHash();
BucketData bd;
int64_t bucket_idx =
Probe<false, true>(buckets_, hash_array_, num_buckets_, ht_ctx, hash, &found, &bd);
if (found) {
return Iterator(this, ht_ctx->scratch_row(), bucket_idx,
stores_duplicates() ? bd.duplicates : NULL);
return End();
// TODO: support lazy evaluation like HashTable::Insert().
template <HashTable::BucketType TYPE>
inline HashTable::Iterator HashTable::FindBuildRowBucket(
HashTableCtx* __restrict__ ht_ctx, bool* found) {
uint32_t hash = ht_ctx->expr_values_cache()->CurExprValuesHash();
BucketData bd;
int64_t bucket_idx = Probe<true, true, TYPE>(
buckets_, hash_array_, num_buckets_, ht_ctx, hash, found, &bd);
DuplicateNode* duplicates = NULL;
if (stores_duplicates() && LIKELY(bucket_idx != Iterator::BUCKET_NOT_FOUND)) {
duplicates = bd.duplicates;
return Iterator(this, ht_ctx->scratch_row(), bucket_idx, duplicates);
inline HashTable::Iterator HashTable::Begin(const HashTableCtx* ctx) {
int64_t bucket_idx = Iterator::BUCKET_NOT_FOUND;
DuplicateNode* node = NULL;
NextFilledBucket(&bucket_idx, &node);
return Iterator(this, ctx->scratch_row(), bucket_idx, node);
inline HashTable::Iterator HashTable::FirstUnmatched(HashTableCtx* ctx) {
int64_t bucket_idx = Iterator::BUCKET_NOT_FOUND;
DuplicateNode* node = NULL;
NextFilledBucket(&bucket_idx, &node);
Iterator it(this, ctx->scratch_row(), bucket_idx, node);
// Check whether the bucket, or its first duplicate node, is matched. If it is not
// matched, then return. Otherwise, move to the first unmatched entry (node or bucket).
Bucket* bucket = &buckets_[bucket_idx];
bool has_duplicates = stores_duplicates() && bucket->HasDuplicates();
if ((!has_duplicates && bucket->IsMatched()) || (has_duplicates && node->IsMatched())) {
return it;
inline void HashTable::NextFilledBucket(int64_t* bucket_idx, DuplicateNode** node) {
for (; *bucket_idx < num_buckets_; ++*bucket_idx) {
if (buckets_[*bucket_idx].IsFilled()) {
*node = stores_duplicates() ? buckets_[*bucket_idx].GetDuplicate() : NULL;
// Reached the end of the hash table.
*bucket_idx = Iterator::BUCKET_NOT_FOUND;
*node = NULL;
inline void HashTable::PrepareBucketForInsert(int64_t bucket_idx, uint32_t hash) {
DCHECK_GE(bucket_idx, 0);
DCHECK_LT(bucket_idx, num_buckets_);
Bucket* bucket = &buckets_[bucket_idx];
hash_array_[bucket_idx] = hash;
inline HashTable::DuplicateNode* HashTable::AppendNextNode(Bucket* bucket) {
DCHECK_GT(node_remaining_current_page_, 0);
return next_node_++;
inline HashTable::DuplicateNode* HashTable::InsertDuplicateNode(
int64_t bucket_idx, Status* status, BucketData* bucket_data) {
DCHECK_GE(bucket_idx, 0);
DCHECK_LT(bucket_idx, num_buckets_);
Bucket* bucket = &buckets_[bucket_idx];
bool has_duplicates = bucket->HasDuplicates();
// Allocate one duplicate node for the new data and one for the preexisting data,
// if needed.
while (node_remaining_current_page_ < 1 + !has_duplicates) {
if (UNLIKELY(!GrowNodeArray(status))) return NULL;
if (!has_duplicates) {
// This is the first duplicate in this bucket. It means that we need to convert
// the current entry in the bucket to a node and link it from the bucket.
next_node_->htdata.flat_row = bucket_data->htdata.flat_row;
// Link a new node and UnsetMatched
return AppendNextNode(bucket);
inline TupleRow* IR_ALWAYS_INLINE HashTable::GetRow(HtData& htdata, TupleRow* row) const {
if (stores_tuples()) {
row->SetTuple(0, htdata.tuple);
// return reinterpret_cast<TupleRow*>(&htdata.tuple);
return row;
} else {
// TODO: GetTupleRow() has interpreted code that iterates over the row's descriptor.
tuple_stream_->GetTupleRow(htdata.flat_row, row);
return row;
template <HashTable::BucketType TYPE>
inline TupleRow* IR_ALWAYS_INLINE HashTable::GetRow(
Bucket* bucket, TupleRow* row, BucketData* bucket_data) const {
DCHECK(bucket != NULL);
if (UNLIKELY(stores_duplicates() && bucket->HasDuplicates())) {
*bucket_data = bucket->GetBucketData();
DuplicateNode* duplicate = bucket_data->duplicates;
DCHECK(duplicate != NULL);
return GetRow(duplicate->htdata, row);
} else {
*bucket_data = bucket->GetBucketData<TYPE>();
return GetRow(bucket_data->htdata, row);
inline TupleRow* IR_ALWAYS_INLINE HashTable::Iterator::GetRow() const {
DCHECK(table_ != NULL);
DCHECK(scratch_row_ != NULL);
Bucket* bucket = &table_->buckets_[bucket_idx_];
if (UNLIKELY(table_->stores_duplicates() && bucket->HasDuplicates())) {
DCHECK(node_ != NULL);
return table_->GetRow(node_->htdata, scratch_row_);
} else {
HtData htdata = bucket->GetBucketData().htdata;
return table_->GetRow(htdata, scratch_row_);
template <HashTable::BucketType TYPE>
inline Tuple* IR_ALWAYS_INLINE HashTable::Iterator::GetTuple() const {
Bucket* bucket = &table_->buckets_[bucket_idx_];
// TODO: To avoid the has_duplicates check, store the HtData* in the Iterator.
if (UNLIKELY(table_->stores_duplicates() && bucket->HasDuplicates())) {
DCHECK(node_ != NULL);
return node_->htdata.tuple;
} else {
return bucket->GetTuple<TYPE>();
inline void HashTable::Iterator::SetTuple(Tuple* tuple, uint32_t hash) {
table_->PrepareBucketForInsert(bucket_idx_, hash);
inline void HashTable::Iterator::SetMatched() {
Bucket* bucket = &table_->buckets_[bucket_idx_];
if (table_->stores_duplicates() && bucket->HasDuplicates()) {
} else {
// Used for disabling spilling of hash tables in right and full-outer joins with
// matches. See IMPALA-1488.
table_->has_matches_ = true;
inline bool HashTable::Iterator::IsMatched() const {
Bucket* bucket = &table_->buckets_[bucket_idx_];
if (table_->stores_duplicates() && bucket->HasDuplicates()) {
return node_->IsMatched();
return bucket->IsMatched();
inline void HashTable::Iterator::SetAtEnd() {
bucket_idx_ = BUCKET_NOT_FOUND;
node_ = NULL;
template <const bool READ>
inline void HashTable::Iterator::PrefetchBucket() {
if (LIKELY(!AtEnd())) {
// HashTable::PrefetchBucket() takes a hash value to index into the hash bucket
// array. Passing 'bucket_idx_' here is sufficient.
DCHECK_EQ((bucket_idx_ & ~(table_->num_buckets_ - 1)), 0);
inline void HashTable::Iterator::Next() {
if (table_->stores_duplicates() && table_->buckets_[bucket_idx_].HasDuplicates()
&& node_->Next() != NULL) {
node_ = node_->Next();
} else {
table_->NextFilledBucket(&bucket_idx_, &node_);
inline void HashTable::Iterator::NextDuplicate() {
if (table_->stores_duplicates() && table_->buckets_[bucket_idx_].HasDuplicates()
&& node_->Next() != NULL) {
node_ = node_->Next();
} else {
bucket_idx_ = BUCKET_NOT_FOUND;
node_ = NULL;
inline void HashTable::Iterator::NextUnmatched() {
Bucket* bucket = &table_->buckets_[bucket_idx_];
// Check if there is any remaining unmatched duplicate node in the current bucket.
if (table_->stores_duplicates() && bucket->HasDuplicates()) {
auto next_node = node_->Next();
while (next_node != NULL) {
node_ = next_node;
if (!node_->IsMatched()) return;
next_node = next_node->Next();
// Move to the next filled bucket and return if this bucket is not matched or
// iterate to the first not matched duplicate node.
table_->NextFilledBucket(&bucket_idx_, &node_);
while (bucket_idx_ != Iterator::BUCKET_NOT_FOUND) {
bucket = &table_->buckets_[bucket_idx_];
if (!table_->stores_duplicates() || !bucket->HasDuplicates()) {
if (!bucket->IsMatched()) return;
} else {
auto next_node = node_->Next();
while (node_->IsMatched() && next_node != NULL) {
node_ = next_node;
next_node = next_node->Next();
if (!node_->IsMatched()) return;
table_->NextFilledBucket(&bucket_idx_, &node_);
inline void HashTableCtx::set_level(int level) {
DCHECK_GE(level, 0);
DCHECK_LT(level, seeds_.size());
level_ = level;
inline int64_t HashTable::CurrentMemSize() const {
return num_buckets_ * (sizeof(Bucket) + sizeof(uint32_t))
+ num_duplicate_nodes_ * sizeof(DuplicateNode);
inline int64_t HashTable::NumInsertsBeforeResize() const {
return std::max<int64_t>(
0, static_cast<int64_t>(num_buckets_ * MAX_FILL_FACTOR) - num_filled_buckets_);
} // namespace impala