blob: dcff33d6474dd542c4842492109de5c3cc66c7e9 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/partitioned-hash-join-builder.h"
#include <numeric>
#include <gutil/strings/substitute.h>
#include "codegen/llvm-codegen.h"
#include "exec/hash-table.inline.h"
#include "exprs/expr-context.h"
#include "exprs/expr.h"
#include "runtime/buffered-tuple-stream.h"
#include "runtime/mem-tracker.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-filter-bank.h"
#include "runtime/runtime-filter.h"
#include "runtime/runtime-state.h"
#include "util/bloom-filter.h"
#include "util/runtime-profile-counters.h"
#include "gen-cpp/PlanNodes_types.h"
#include "common/names.h"
static const string PREPARE_FOR_READ_FAILED_ERROR_MSG =
"Failed to acquire initial read "
"buffer for stream in hash join node $0. Reducing query concurrency or increasing "
"the memory limit may help this query to complete successfully.";
using namespace impala;
using namespace llvm;
using namespace strings;
using std::unique_ptr;
PhjBuilder::PhjBuilder(int join_node_id, TJoinOp::type join_op,
const RowDescriptor& probe_row_desc, const RowDescriptor& build_row_desc,
RuntimeState* state)
: DataSink(build_row_desc),
runtime_state_(state),
join_node_id_(join_node_id),
join_op_(join_op),
probe_row_desc_(probe_row_desc),
block_mgr_client_(NULL),
non_empty_build_(false),
partitions_created_(NULL),
largest_partition_percent_(NULL),
max_partition_level_(NULL),
num_build_rows_partitioned_(NULL),
num_hash_collisions_(NULL),
num_hash_buckets_(NULL),
num_spilled_partitions_(NULL),
num_repartitions_(NULL),
partition_build_rows_timer_(NULL),
build_hash_table_timer_(NULL),
repartition_timer_(NULL),
null_aware_partition_(NULL),
process_build_batch_fn_(NULL),
process_build_batch_fn_level0_(NULL),
insert_batch_fn_(NULL),
insert_batch_fn_level0_(NULL) {}
Status PhjBuilder::Init(RuntimeState* state,
const vector<TEqJoinCondition>& eq_join_conjuncts,
const vector<TRuntimeFilterDesc>& filters) {
for (const TEqJoinCondition& eq_join_conjunct : eq_join_conjuncts) {
ExprContext* ctx;
RETURN_IF_ERROR(Expr::CreateExprTree(&pool_, eq_join_conjunct.right, &ctx));
build_expr_ctxs_.push_back(ctx);
is_not_distinct_from_.push_back(eq_join_conjunct.is_not_distinct_from);
}
for (const TRuntimeFilterDesc& filter : filters) {
// If filter propagation not enabled, only consider building broadcast joins (that may
// be consumed by this fragment).
if (state->query_options().runtime_filter_mode != TRuntimeFilterMode::GLOBAL
&& !filter.is_broadcast_join) {
continue;
}
if (state->query_options().disable_row_runtime_filtering
&& !filter.applied_on_partition_columns) {
continue;
}
FilterContext filter_ctx;
filter_ctx.filter = state->filter_bank()->RegisterFilter(filter, true);
RETURN_IF_ERROR(Expr::CreateExprTree(&pool_, filter.src_expr, &filter_ctx.expr_ctx));
filters_.push_back(filter_ctx);
}
return Status::OK();
}
string PhjBuilder::GetName() {
return Substitute("Hash Join Builder (join_node_id=$0)", join_node_id_);
}
Status PhjBuilder::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) {
RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker));
RETURN_IF_ERROR(
Expr::Prepare(build_expr_ctxs_, state, row_desc_, expr_mem_tracker_.get()));
expr_ctxs_to_free_.insert(
expr_ctxs_to_free_.end(), build_expr_ctxs_.begin(), build_expr_ctxs_.end());
for (const FilterContext& ctx : filters_) {
RETURN_IF_ERROR(ctx.expr_ctx->Prepare(state, row_desc_, expr_mem_tracker_.get()));
expr_ctxs_to_free_.push_back(ctx.expr_ctx);
}
RETURN_IF_ERROR(HashTableCtx::Create(state, build_expr_ctxs_, build_expr_ctxs_,
HashTableStoresNulls(), is_not_distinct_from_, state->fragment_hash_seed(),
MAX_PARTITION_DEPTH, row_desc_.tuple_descriptors().size(), mem_tracker_.get(),
&ht_ctx_));
RETURN_IF_ERROR(state->block_mgr()->RegisterClient(
Substitute("PartitionedHashJoin id=$0 builder=$1", join_node_id_, this),
MinRequiredBuffers(), true, mem_tracker_.get(), state, &block_mgr_client_));
partitions_created_ = ADD_COUNTER(profile(), "PartitionsCreated", TUnit::UNIT);
largest_partition_percent_ =
profile()->AddHighWaterMarkCounter("LargestPartitionPercent", TUnit::UNIT);
max_partition_level_ =
profile()->AddHighWaterMarkCounter("MaxPartitionLevel", TUnit::UNIT);
num_build_rows_partitioned_ =
ADD_COUNTER(profile(), "BuildRowsPartitioned", TUnit::UNIT);
num_hash_collisions_ = ADD_COUNTER(profile(), "HashCollisions", TUnit::UNIT);
num_hash_buckets_ = ADD_COUNTER(profile(), "HashBuckets", TUnit::UNIT);
num_spilled_partitions_ = ADD_COUNTER(profile(), "SpilledPartitions", TUnit::UNIT);
num_repartitions_ = ADD_COUNTER(profile(), "NumRepartitions", TUnit::UNIT);
partition_build_rows_timer_ = ADD_TIMER(profile(), "BuildRowsPartitionTime");
build_hash_table_timer_ = ADD_TIMER(profile(), "HashTablesBuildTime");
repartition_timer_ = ADD_TIMER(profile(), "RepartitionTime");
if (state->CodegenDisabledByQueryOption()) {
profile()->AddCodegenMsg(false, "disabled by query option DISABLE_CODEGEN");
} else if (state->CodegenDisabledByHint()) {
profile()->AddCodegenMsg(false, "disabled due to optimization hints");
}
return Status::OK();
}
Status PhjBuilder::Open(RuntimeState* state) {
RETURN_IF_ERROR(Expr::Open(build_expr_ctxs_, state));
for (const FilterContext& filter : filters_) {
RETURN_IF_ERROR(filter.expr_ctx->Open(state));
}
RETURN_IF_ERROR(CreateHashPartitions(0));
AllocateRuntimeFilters();
if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
RETURN_IF_ERROR(CreateAndPreparePartition(0, &null_aware_partition_));
}
return Status::OK();
}
Status PhjBuilder::Send(RuntimeState* state, RowBatch* batch) {
SCOPED_TIMER(partition_build_rows_timer_);
bool build_filters = ht_ctx_->level() == 0 && filters_.size() > 0;
if (process_build_batch_fn_ == NULL) {
RETURN_IF_ERROR(ProcessBuildBatch(batch, ht_ctx_.get(), build_filters));
} else {
DCHECK(process_build_batch_fn_level0_ != NULL);
if (ht_ctx_->level() == 0) {
RETURN_IF_ERROR(
process_build_batch_fn_level0_(this, batch, ht_ctx_.get(), build_filters));
} else {
RETURN_IF_ERROR(process_build_batch_fn_(this, batch, ht_ctx_.get(), build_filters));
}
}
// Free any local allocations made during partitioning.
ExprContext::FreeLocalAllocations(expr_ctxs_to_free_);
COUNTER_ADD(num_build_rows_partitioned_, batch->num_rows());
return Status::OK();
}
Status PhjBuilder::FlushFinal(RuntimeState* state) {
int64_t num_build_rows = 0;
for (Partition* partition : hash_partitions_) {
num_build_rows += partition->build_rows()->num_rows();
}
if (num_build_rows > 0) {
double largest_fraction = 0.0;
for (Partition* partition : hash_partitions_) {
largest_fraction = max(largest_fraction,
partition->build_rows()->num_rows() / static_cast<double>(num_build_rows));
}
COUNTER_SET(largest_partition_percent_, static_cast<int64_t>(largest_fraction * 100));
}
if (VLOG_IS_ON(2)) {
stringstream ss;
ss << Substitute("PHJ(node_id=$0) partitioned(level=$1) $2 rows into:", join_node_id_,
hash_partitions_[0]->level(), num_build_rows);
for (int i = 0; i < hash_partitions_.size(); ++i) {
Partition* partition = hash_partitions_[i];
double percent = num_build_rows == 0 ? 0.0 : partition->build_rows()->num_rows()
* 100 / static_cast<double>(num_build_rows);
ss << " " << i << " " << (partition->is_spilled() ? "spilled" : "not spilled")
<< " (fraction=" << fixed << setprecision(2) << percent << "%)" << endl
<< " #rows:" << partition->build_rows()->num_rows() << endl;
}
VLOG(2) << ss.str();
}
if (ht_ctx_->level() == 0) {
PublishRuntimeFilters(num_build_rows);
non_empty_build_ |= (num_build_rows > 0);
}
RETURN_IF_ERROR(BuildHashTablesAndPrepareProbeStreams());
return Status::OK();
}
void PhjBuilder::Close(RuntimeState* state) {
if (closed_) return;
ExprContext::FreeLocalAllocations(expr_ctxs_to_free_);
CloseAndDeletePartitions();
if (ht_ctx_ != NULL) ht_ctx_->Close();
Expr::Close(build_expr_ctxs_, state);
for (const FilterContext& ctx : filters_) ctx.expr_ctx->Close(state);
if (block_mgr_client_ != NULL) state->block_mgr()->ClearReservations(block_mgr_client_);
pool_.Clear();
DataSink::Close(state);
closed_ = true;
}
void PhjBuilder::Reset() {
ExprContext::FreeLocalAllocations(expr_ctxs_to_free_);
non_empty_build_ = false;
CloseAndDeletePartitions();
}
Status PhjBuilder::CreateAndPreparePartition(int level, Partition** partition) {
all_partitions_.emplace_back(new Partition(runtime_state_, this, level));
*partition = all_partitions_.back().get();
RETURN_IF_ERROR((*partition)->build_rows()->Init(join_node_id_, profile(), true));
bool got_buffer;
RETURN_IF_ERROR((*partition)->build_rows()->PrepareForWrite(&got_buffer));
if (!got_buffer) {
return runtime_state_->block_mgr()->MemLimitTooLowError(
block_mgr_client_, join_node_id_);
}
return Status::OK();
}
Status PhjBuilder::CreateHashPartitions(int level) {
DCHECK(hash_partitions_.empty());
ht_ctx_->set_level(level); // Set the hash function for partitioning input.
for (int i = 0; i < PARTITION_FANOUT; ++i) {
Partition* new_partition;
RETURN_IF_ERROR(CreateAndPreparePartition(level, &new_partition));
hash_partitions_.push_back(new_partition);
}
COUNTER_ADD(partitions_created_, PARTITION_FANOUT);
COUNTER_SET(max_partition_level_, level);
return Status::OK();
}
bool PhjBuilder::AppendRowStreamFull(
BufferedTupleStream* stream, TupleRow* row, Status* status) noexcept {
while (true) {
// Check if the stream is still using small buffers and try to switch to IO-buffers.
if (stream->using_small_buffers()) {
bool got_buffer;
*status = stream->SwitchToIoBuffers(&got_buffer);
if (!status->ok()) return false;
if (got_buffer) {
if (LIKELY(stream->AddRow(row, status))) return true;
if (!status->ok()) return false;
}
}
// We ran out of memory. Pick a partition to spill. If we ran out of unspilled
// partitions, SpillPartition() will return an error status.
*status = SpillPartition(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
if (!status->ok()) return false;
if (stream->AddRow(row, status)) return true;
if (!status->ok()) return false;
// Spilling one partition does not guarantee we can append a row. Keep
// spilling until we can append this row.
}
}
// TODO: can we do better with a different spilling heuristic?
Status PhjBuilder::SpillPartition(BufferedTupleStream::UnpinMode mode) {
DCHECK_EQ(hash_partitions_.size(), PARTITION_FANOUT);
int64_t max_freed_mem = 0;
int partition_idx = -1;
// Iterate over the partitions and pick the largest partition to spill.
for (int i = 0; i < PARTITION_FANOUT; ++i) {
Partition* candidate = hash_partitions_[i];
if (candidate->IsClosed()) continue;
if (candidate->is_spilled()) continue;
int64_t mem = candidate->build_rows()->bytes_in_mem(false);
if (candidate->hash_tbl() != NULL) {
// The hash table should not have matches, since we have not probed it yet.
// Losing match info would lead to incorrect results (IMPALA-1488).
DCHECK(!candidate->hash_tbl()->HasMatches());
mem += candidate->hash_tbl()->ByteSize();
}
if (mem > max_freed_mem) {
max_freed_mem = mem;
partition_idx = i;
}
}
if (partition_idx == -1) {
// Could not find a partition to spill. This means the mem limit was just too low.
return runtime_state_->block_mgr()->MemLimitTooLowError(
block_mgr_client_, join_node_id_);
}
VLOG(2) << "Spilling partition: " << partition_idx << endl << DebugString();
Partition* build_partition = hash_partitions_[partition_idx];
RETURN_IF_ERROR(build_partition->Spill(mode));
return Status::OK();
}
// When this function is called, we've finished processing the current build input
// (either from the child ExecNode or from repartitioning a spilled partition). The build
// rows have only been partitioned, we still need to build hash tables over them. Some
// of the partitions could have already been spilled and attempting to build hash
// tables over the non-spilled ones can cause them to spill.
//
// At the end of the function all partitions either have a hash table (and therefore are
// not spilled) or are spilled. Partitions that have hash tables don't need to spill on
// the probe side.
//
// This maps perfectly to a 0-1 knapsack where the weight is the memory to keep the
// build rows and hash table and the value is the expected IO savings.
// For now, we go with a greedy solution.
//
// TODO: implement the knapsack solution.
Status PhjBuilder::BuildHashTablesAndPrepareProbeStreams() {
DCHECK_EQ(PARTITION_FANOUT, hash_partitions_.size());
for (int i = 0; i < PARTITION_FANOUT; ++i) {
Partition* partition = hash_partitions_[i];
if (partition->build_rows()->num_rows() == 0) {
// This partition is empty, no need to do anything else.
partition->Close(NULL);
} else if (partition->is_spilled()) {
// We don't need any build-side data for spilled partitions in memory.
RETURN_IF_ERROR(
partition->build_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL));
}
}
// Allocate probe buffers for all partitions that are already spilled. Do this before
// building hash tables because allocating probe buffers may cause some more partitions
// to be spilled. This avoids wasted work on building hash tables for partitions that
// won't fit in memory alongside the required probe buffers.
RETURN_IF_ERROR(InitSpilledPartitionProbeStreams());
for (int i = 0; i < PARTITION_FANOUT; ++i) {
Partition* partition = hash_partitions_[i];
if (partition->IsClosed() || partition->is_spilled()) continue;
bool built = false;
DCHECK(partition->build_rows()->is_pinned());
RETURN_IF_ERROR(partition->BuildHashTable(&built));
// If we did not have enough memory to build this hash table, we need to spill this
// partition (clean up the hash table, unpin build).
if (!built) RETURN_IF_ERROR(partition->Spill(BufferedTupleStream::UNPIN_ALL));
}
// We may have spilled additional partitions while building hash tables, we need to
// initialize probe buffers for them.
// TODO: once we have reliable reservations (IMPALA-3200) this should no longer be
// necessary: we will know exactly how many partitions will fit in memory and we can
// avoid building then immediately destroying hash tables.
RETURN_IF_ERROR(InitSpilledPartitionProbeStreams());
// TODO: at this point we could have freed enough memory to pin and build some
// spilled partitions. This can happen, for example if there is a lot of skew.
// Partition 1: 10GB (pinned initially).
// Partition 2,3,4: 1GB (spilled during partitioning the build).
// In the previous step, we could have unpinned 10GB (because there was not enough
// memory to build a hash table over it) which can now free enough memory to
// build hash tables over the remaining 3 partitions.
// We start by spilling the largest partition though so the build input would have
// to be pretty pathological.
// Investigate if this is worthwhile.
return Status::OK();
}
Status PhjBuilder::InitSpilledPartitionProbeStreams() {
DCHECK_EQ(PARTITION_FANOUT, hash_partitions_.size());
int num_spilled_partitions = 0;
for (int i = 0; i < PARTITION_FANOUT; ++i) {
Partition* partition = hash_partitions_[i];
if (!partition->IsClosed() && partition->is_spilled()) ++num_spilled_partitions;
}
int probe_streams_to_create =
num_spilled_partitions - spilled_partition_probe_streams_.size();
while (probe_streams_to_create > 0) {
// Create stream in vector, so that it will be cleaned up after any failure.
spilled_partition_probe_streams_.emplace_back(std::make_unique<BufferedTupleStream>(
runtime_state_, probe_row_desc_, runtime_state_->block_mgr(), block_mgr_client_,
false /* use_initial_small_buffers */, false /* read_write */));
BufferedTupleStream* probe_stream = spilled_partition_probe_streams_.back().get();
RETURN_IF_ERROR(probe_stream->Init(join_node_id_, profile(), false));
// Loop until either the stream gets a buffer or all partitions are spilled (in which
// case SpillPartition() returns an error).
while (true) {
bool got_buffer;
RETURN_IF_ERROR(probe_stream->PrepareForWrite(&got_buffer));
if (got_buffer) break;
RETURN_IF_ERROR(SpillPartition(BufferedTupleStream::UNPIN_ALL));
++probe_streams_to_create;
}
--probe_streams_to_create;
}
return Status::OK();
}
vector<unique_ptr<BufferedTupleStream>> PhjBuilder::TransferProbeStreams() {
return std::move(spilled_partition_probe_streams_);
}
void PhjBuilder::CloseAndDeletePartitions() {
// Close all the partitions and clean up all references to them.
for (unique_ptr<Partition>& partition : all_partitions_) partition->Close(NULL);
all_partitions_.clear();
hash_partitions_.clear();
null_aware_partition_ = NULL;
for (unique_ptr<BufferedTupleStream>& stream : spilled_partition_probe_streams_) {
stream->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
}
spilled_partition_probe_streams_.clear();
}
void PhjBuilder::AllocateRuntimeFilters() {
DCHECK(join_op_ != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || filters_.size() == 0)
<< "Runtime filters not supported with NULL_AWARE_LEFT_ANTI_JOIN";
DCHECK(ht_ctx_ != NULL);
for (int i = 0; i < filters_.size(); ++i) {
filters_[i].local_bloom_filter =
runtime_state_->filter_bank()->AllocateScratchBloomFilter(
filters_[i].filter->id());
}
}
void PhjBuilder::PublishRuntimeFilters(int64_t num_build_rows) {
int32_t num_enabled_filters = 0;
// Use 'num_build_rows' to estimate FP-rate of each Bloom filter, and publish
// 'always-true' filters if it's too high. Doing so saves CPU at the coordinator,
// serialisation time, and reduces the cost of applying the filter at the scan - most
// significantly for per-row filters. However, the number of build rows could be a very
// poor estimate of the NDV - particularly if the filter expression is a function of
// several columns.
// TODO: Better heuristic.
for (const FilterContext& ctx : filters_) {
// TODO: Consider checking actual number of bits set in filter to compute FP rate.
// TODO: Consider checking this every few batches or so.
bool fp_rate_too_high = runtime_state_->filter_bank()->FpRateTooHigh(
ctx.filter->filter_size(), num_build_rows);
runtime_state_->filter_bank()->UpdateFilterFromLocal(ctx.filter->id(),
fp_rate_too_high ? BloomFilter::ALWAYS_TRUE_FILTER : ctx.local_bloom_filter);
num_enabled_filters += !fp_rate_too_high;
}
if (filters_.size() > 0) {
string info_string;
if (num_enabled_filters == filters_.size()) {
info_string = Substitute("$0 of $0 Runtime Filter$1 Published", filters_.size(),
filters_.size() == 1 ? "" : "s");
} else {
info_string = Substitute("$0 of $1 Runtime Filter$2 Published, $3 Disabled",
num_enabled_filters, filters_.size(), filters_.size() == 1 ? "" : "s",
filters_.size() - num_enabled_filters);
}
profile()->AddInfoString("Runtime filters", info_string);
}
}
Status PhjBuilder::RepartitionBuildInput(
Partition* input_partition, int level, BufferedTupleStream* input_probe_rows) {
DCHECK_GE(level, 1);
SCOPED_TIMER(repartition_timer_);
COUNTER_ADD(num_repartitions_, 1);
RuntimeState* state = runtime_state_;
// Setup the read buffer and the new partitions.
BufferedTupleStream* build_rows = input_partition->build_rows();
DCHECK(build_rows != NULL);
bool got_read_buffer;
RETURN_IF_ERROR(build_rows->PrepareForRead(true, &got_read_buffer));
if (!got_read_buffer) {
return mem_tracker()->MemLimitExceeded(
state, Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, join_node_id_));
}
RETURN_IF_ERROR(CreateHashPartitions(level));
// Repartition 'input_stream' into 'hash_partitions_'.
RowBatch build_batch(row_desc_, state->batch_size(), mem_tracker());
bool eos = false;
while (!eos) {
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(state->CheckQueryState());
RETURN_IF_ERROR(build_rows->GetNext(&build_batch, &eos));
RETURN_IF_ERROR(Send(state, &build_batch));
build_batch.Reset();
}
// Done reading the input, we can safely close it now to free memory.
input_partition->Close(NULL);
// We just freed up the buffer used for reading build rows. Ensure a buffer is
// allocated for reading probe rows before we build the hash tables in FlushFinal().
// TODO: once we have reliable reservations (IMPALA-3200) we can just hand off the
// reservation and avoid this complication.
while (true) {
bool got_buffer;
input_probe_rows->PrepareForRead(true, &got_buffer);
if (got_buffer) break;
RETURN_IF_ERROR(SpillPartition(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
}
RETURN_IF_ERROR(FlushFinal(state));
return Status::OK();
}
int64_t PhjBuilder::LargestPartitionRows() const {
int64_t max_rows = 0;
for (int i = 0; i < hash_partitions_.size(); ++i) {
Partition* partition = hash_partitions_[i];
DCHECK(partition != NULL);
if (partition->IsClosed()) continue;
int64_t rows = partition->build_rows()->num_rows();
if (rows > max_rows) max_rows = rows;
}
return max_rows;
}
bool PhjBuilder::HashTableStoresNulls() const {
return join_op_ == TJoinOp::RIGHT_OUTER_JOIN || join_op_ == TJoinOp::RIGHT_ANTI_JOIN
|| join_op_ == TJoinOp::FULL_OUTER_JOIN
|| std::accumulate(is_not_distinct_from_.begin(), is_not_distinct_from_.end(),
false, std::logical_or<bool>());
}
PhjBuilder::Partition::Partition(RuntimeState* state, PhjBuilder* parent, int level)
: parent_(parent), is_spilled_(false), level_(level) {
// If we're repartitioning, we can assume the build input is fairly large and small
// buffers will most likely just waste memory.
bool use_initial_small_buffers = level == 0;
build_rows_ =
std::make_unique<BufferedTupleStream>(state, parent_->row_desc_, state->block_mgr(),
parent_->block_mgr_client_, use_initial_small_buffers, false /* read_write */);
}
PhjBuilder::Partition::~Partition() {
DCHECK(IsClosed());
}
int64_t PhjBuilder::Partition::EstimatedInMemSize() const {
return build_rows_->byte_size() + HashTable::EstimateSize(build_rows_->num_rows());
}
void PhjBuilder::Partition::Close(RowBatch* batch) {
if (IsClosed()) return;
if (hash_tbl_ != NULL) {
COUNTER_ADD(parent_->num_hash_collisions_, hash_tbl_->NumHashCollisions());
hash_tbl_->Close();
}
// Transfer ownership of 'build_rows_' memory to 'batch' if 'batch' is not NULL.
// Flush out the resources to free up memory for subsequent partitions.
if (build_rows_ != NULL) {
build_rows_->Close(batch, RowBatch::FlushMode::FLUSH_RESOURCES);
build_rows_.reset();
}
}
Status PhjBuilder::Partition::Spill(BufferedTupleStream::UnpinMode mode) {
DCHECK(!IsClosed());
// Close the hash table as soon as possible to release memory.
if (hash_tbl() != NULL) {
hash_tbl_->Close();
hash_tbl_.reset();
}
// Unpin the stream as soon as possible to increase the chances that the
// SwitchToIoBuffers() call below will succeed.
RETURN_IF_ERROR(build_rows_->UnpinStream(mode));
if (build_rows_->using_small_buffers()) {
bool got_buffer;
RETURN_IF_ERROR(build_rows_->SwitchToIoBuffers(&got_buffer));
if (!got_buffer) {
// We'll try again to get the buffers when the stream fills up the small buffers.
VLOG_QUERY << "Not enough memory to switch to IO-sized buffer for partition "
<< this << " of join=" << parent_->join_node_id_
<< " build small buffers=" << build_rows_->using_small_buffers();
VLOG_FILE << GetStackTrace();
}
}
if (!is_spilled_) {
COUNTER_ADD(parent_->num_spilled_partitions_, 1);
if (parent_->num_spilled_partitions_->value() == 1) {
parent_->profile()->AppendExecOption("Spilled");
}
}
is_spilled_ = true;
return Status::OK();
}
Status PhjBuilder::Partition::BuildHashTable(bool* built) {
SCOPED_TIMER(parent_->build_hash_table_timer_);
DCHECK(build_rows_ != NULL);
*built = false;
// Before building the hash table, we need to pin the rows in memory.
RETURN_IF_ERROR(build_rows_->PinStream(false, built));
if (!*built) return Status::OK();
RuntimeState* state = parent_->runtime_state_;
HashTableCtx* ctx = parent_->ht_ctx_.get();
ctx->set_level(level()); // Set the hash function for building the hash table.
RowBatch batch(parent_->row_desc_, state->batch_size(), parent_->mem_tracker());
vector<BufferedTupleStream::RowIdx> indices;
bool eos = false;
// Allocate the partition-local hash table. Initialize the number of buckets based on
// the number of build rows (the number of rows is known at this point). This assumes
// there are no duplicates which can be wrong. However, the upside in the common case
// (few/no duplicates) is large and the downside when there are is low (a bit more
// memory; the bucket memory is small compared to the memory needed for all the build
// side allocations).
// One corner case is if the stream contains tuples with zero footprint (no materialized
// slots). If the tuples occupy no space, this implies all rows will be duplicates, so
// create a small hash table, IMPALA-2256.
//
// TODO: Try to allocate the hash table before pinning the stream to avoid needlessly
// reading all of the spilled rows from disk when we won't succeed anyway.
int64_t estimated_num_buckets = build_rows()->RowConsumesMemory() ?
HashTable::EstimateNumBuckets(build_rows()->num_rows()) :
state->batch_size() * 2;
hash_tbl_.reset(HashTable::Create(state, parent_->block_mgr_client_,
true /* store_duplicates */, parent_->row_desc_.tuple_descriptors().size(),
build_rows(), 1 << (32 - NUM_PARTITIONING_BITS), estimated_num_buckets));
if (!hash_tbl_->Init()) goto not_built;
bool got_read_buffer;
RETURN_IF_ERROR(build_rows_->PrepareForRead(false, &got_read_buffer));
DCHECK(got_read_buffer) << "Stream was already pinned.";
do {
RETURN_IF_ERROR(build_rows_->GetNext(&batch, &eos, &indices));
DCHECK_EQ(batch.num_rows(), indices.size());
DCHECK_LE(batch.num_rows(), hash_tbl_->EmptyBuckets())
<< build_rows()->RowConsumesMemory();
TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
if (parent_->insert_batch_fn_ != NULL) {
InsertBatchFn insert_batch_fn;
if (level() == 0) {
insert_batch_fn = parent_->insert_batch_fn_level0_;
} else {
insert_batch_fn = parent_->insert_batch_fn_;
}
DCHECK(insert_batch_fn != NULL);
if (UNLIKELY(!insert_batch_fn(this, prefetch_mode, ctx, &batch, indices))) {
goto not_built;
}
} else {
if (UNLIKELY(!InsertBatch(prefetch_mode, ctx, &batch, indices))) goto not_built;
}
RETURN_IF_ERROR(state->GetQueryStatus());
// Free any local allocations made while inserting.
ExprContext::FreeLocalAllocations(parent_->expr_ctxs_to_free_);
batch.Reset();
} while (!eos);
// The hash table fits in memory and is built.
DCHECK(*built);
DCHECK(hash_tbl_ != NULL);
is_spilled_ = false;
COUNTER_ADD(parent_->num_hash_buckets_, hash_tbl_->num_buckets());
return Status::OK();
not_built:
*built = false;
if (hash_tbl_ != NULL) {
hash_tbl_->Close();
hash_tbl_.reset();
}
return Status::OK();
}
void PhjBuilder::Codegen(LlvmCodeGen* codegen) {
Status build_codegen_status;
Status insert_codegen_status;
Status codegen_status;
// Codegen for hashing rows with the builder's hash table context.
Function* hash_fn;
codegen_status = ht_ctx_->CodegenHashRow(codegen, false, &hash_fn);
Function* murmur_hash_fn;
codegen_status.MergeStatus(ht_ctx_->CodegenHashRow(codegen, true, &murmur_hash_fn));
// Codegen for evaluating build rows
Function* eval_build_row_fn;
codegen_status.MergeStatus(ht_ctx_->CodegenEvalRow(codegen, true, &eval_build_row_fn));
if (codegen_status.ok()) {
TPrefetchMode::type prefetch_mode = runtime_state_->query_options().prefetch_mode;
build_codegen_status =
CodegenProcessBuildBatch(codegen, hash_fn, murmur_hash_fn, eval_build_row_fn);
insert_codegen_status = CodegenInsertBatch(codegen, hash_fn, murmur_hash_fn,
eval_build_row_fn, prefetch_mode);
} else {
build_codegen_status = codegen_status;
insert_codegen_status = codegen_status;
}
profile()->AddCodegenMsg(build_codegen_status.ok(), build_codegen_status, "Build Side");
profile()->AddCodegenMsg(insert_codegen_status.ok(), insert_codegen_status,
"Hash Table Construction");
}
string PhjBuilder::DebugString() const {
stringstream ss;
ss << "Hash partitions: " << hash_partitions_.size() << ":" << endl;
for (int i = 0; i < hash_partitions_.size(); ++i) {
Partition* partition = hash_partitions_[i];
ss << " Hash partition " << i << " ptr=" << partition;
if (partition->IsClosed()) {
ss << " Closed";
continue;
}
if (partition->is_spilled()) {
ss << " Spilled";
}
DCHECK(partition->build_rows() != NULL);
ss << endl
<< " Build Rows: " << partition->build_rows()->num_rows()
<< " (Blocks pinned: " << partition->build_rows()->blocks_pinned() << ")" << endl;
if (partition->hash_tbl() != NULL) {
ss << " Hash Table Rows: " << partition->hash_tbl()->size() << endl;
}
}
return ss.str();
}
Status PhjBuilder::CodegenProcessBuildBatch(LlvmCodeGen* codegen,
Function* hash_fn, Function* murmur_hash_fn, Function* eval_row_fn) {
Function* process_build_batch_fn =
codegen->GetFunction(IRFunction::PHJ_PROCESS_BUILD_BATCH, true);
DCHECK(process_build_batch_fn != NULL);
// Replace call sites
int replaced =
codegen->ReplaceCallSites(process_build_batch_fn, eval_row_fn, "EvalBuildRow");
DCHECK_EQ(replaced, 1);
// Replace some hash table parameters with constants.
HashTableCtx::HashTableReplacedConstants replaced_constants;
const bool stores_duplicates = true;
const int num_build_tuples = row_desc_.tuple_descriptors().size();
RETURN_IF_ERROR(ht_ctx_->ReplaceHashTableConstants(codegen, stores_duplicates,
num_build_tuples, process_build_batch_fn, &replaced_constants));
DCHECK_GE(replaced_constants.stores_nulls, 1);
DCHECK_EQ(replaced_constants.finds_some_nulls, 0);
DCHECK_EQ(replaced_constants.stores_duplicates, 0);
DCHECK_EQ(replaced_constants.stores_tuples, 0);
DCHECK_EQ(replaced_constants.quadratic_probing, 0);
Function* process_build_batch_fn_level0 =
codegen->CloneFunction(process_build_batch_fn);
// Always build runtime filters at level0 (if there are any).
// Note that the first argument of this function is the return value.
Value* build_filters_l0_arg = codegen->GetArgument(process_build_batch_fn_level0, 4);
build_filters_l0_arg->replaceAllUsesWith(
ConstantInt::get(Type::getInt1Ty(codegen->context()), filters_.size() > 0));
// process_build_batch_fn_level0 uses CRC hash if available,
replaced =
codegen->ReplaceCallSites(process_build_batch_fn_level0, hash_fn, "HashRow");
DCHECK_EQ(replaced, 1);
// process_build_batch_fn uses murmur
replaced =
codegen->ReplaceCallSites(process_build_batch_fn, murmur_hash_fn, "HashRow");
DCHECK_EQ(replaced, 1);
// Never build filters after repartitioning, as all rows have already been added to the
// filters during the level0 build. Note that the first argument of this function is the
// return value.
Value* build_filters_arg = codegen->GetArgument(process_build_batch_fn, 4);
build_filters_arg->replaceAllUsesWith(
ConstantInt::get(Type::getInt1Ty(codegen->context()), false));
// Finalize ProcessBuildBatch functions
process_build_batch_fn = codegen->FinalizeFunction(process_build_batch_fn);
if (process_build_batch_fn == NULL) {
return Status(
"Codegen'd PhjBuilder::ProcessBuildBatch() function "
"failed verification, see log");
}
process_build_batch_fn_level0 =
codegen->FinalizeFunction(process_build_batch_fn_level0);
if (process_build_batch_fn == NULL) {
return Status(
"Codegen'd level-zero PhjBuilder::ProcessBuildBatch() "
"function failed verification, see log");
}
// Register native function pointers
codegen->AddFunctionToJit(
process_build_batch_fn, reinterpret_cast<void**>(&process_build_batch_fn_));
codegen->AddFunctionToJit(process_build_batch_fn_level0,
reinterpret_cast<void**>(&process_build_batch_fn_level0_));
return Status::OK();
}
Status PhjBuilder::CodegenInsertBatch(LlvmCodeGen* codegen, Function* hash_fn,
Function* murmur_hash_fn, Function* eval_row_fn, TPrefetchMode::type prefetch_mode) {
Function* insert_batch_fn = codegen->GetFunction(IRFunction::PHJ_INSERT_BATCH, true);
Function* build_equals_fn;
RETURN_IF_ERROR(ht_ctx_->CodegenEquals(codegen, true, &build_equals_fn));
// Replace the parameter 'prefetch_mode' with constant.
Value* prefetch_mode_arg = codegen->GetArgument(insert_batch_fn, 1);
DCHECK_GE(prefetch_mode, TPrefetchMode::NONE);
DCHECK_LE(prefetch_mode, TPrefetchMode::HT_BUCKET);
prefetch_mode_arg->replaceAllUsesWith(
ConstantInt::get(Type::getInt32Ty(codegen->context()), prefetch_mode));
// Use codegen'd EvalBuildRow() function
int replaced = codegen->ReplaceCallSites(insert_batch_fn, eval_row_fn, "EvalBuildRow");
DCHECK_EQ(replaced, 1);
// Use codegen'd Equals() function
replaced = codegen->ReplaceCallSites(insert_batch_fn, build_equals_fn, "Equals");
DCHECK_EQ(replaced, 1);
// Replace hash-table parameters with constants.
HashTableCtx::HashTableReplacedConstants replaced_constants;
const bool stores_duplicates = true;
const int num_build_tuples = row_desc_.tuple_descriptors().size();
RETURN_IF_ERROR(ht_ctx_->ReplaceHashTableConstants(codegen, stores_duplicates,
num_build_tuples, insert_batch_fn, &replaced_constants));
DCHECK_GE(replaced_constants.stores_nulls, 1);
DCHECK_EQ(replaced_constants.finds_some_nulls, 0);
DCHECK_GE(replaced_constants.stores_duplicates, 1);
DCHECK_GE(replaced_constants.stores_tuples, 1);
DCHECK_GE(replaced_constants.quadratic_probing, 1);
Function* insert_batch_fn_level0 = codegen->CloneFunction(insert_batch_fn);
// Use codegen'd hash functions
replaced = codegen->ReplaceCallSites(insert_batch_fn_level0, hash_fn, "HashRow");
DCHECK_EQ(replaced, 1);
replaced = codegen->ReplaceCallSites(insert_batch_fn, murmur_hash_fn, "HashRow");
DCHECK_EQ(replaced, 1);
insert_batch_fn = codegen->FinalizeFunction(insert_batch_fn);
if (insert_batch_fn == NULL) {
return Status(
"PartitionedHashJoinNode::CodegenInsertBatch(): codegen'd "
"InsertBatch() function failed verification, see log");
}
insert_batch_fn_level0 = codegen->FinalizeFunction(insert_batch_fn_level0);
if (insert_batch_fn_level0 == NULL) {
return Status(
"PartitionedHashJoinNode::CodegenInsertBatch(): codegen'd zero-level "
"InsertBatch() function failed verification, see log");
}
codegen->AddFunctionToJit(insert_batch_fn, reinterpret_cast<void**>(&insert_batch_fn_));
codegen->AddFunctionToJit(
insert_batch_fn_level0, reinterpret_cast<void**>(&insert_batch_fn_level0_));
return Status::OK();
}