| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "exec/partitioned-hash-join-builder.h" |
| |
| #include <numeric> |
| |
| #include <gutil/strings/substitute.h> |
| |
| #include "codegen/llvm-codegen.h" |
| #include "exec/hash-table.inline.h" |
| #include "exprs/expr-context.h" |
| #include "exprs/expr.h" |
| #include "runtime/buffered-tuple-stream.h" |
| #include "runtime/mem-tracker.h" |
| #include "runtime/row-batch.h" |
| #include "runtime/runtime-filter-bank.h" |
| #include "runtime/runtime-filter.h" |
| #include "runtime/runtime-state.h" |
| #include "util/bloom-filter.h" |
| #include "util/runtime-profile-counters.h" |
| |
| #include "gen-cpp/PlanNodes_types.h" |
| |
| #include "common/names.h" |
| |
| static const string PREPARE_FOR_READ_FAILED_ERROR_MSG = |
| "Failed to acquire initial read " |
| "buffer for stream in hash join node $0. Reducing query concurrency or increasing " |
| "the memory limit may help this query to complete successfully."; |
| |
| using namespace impala; |
| using namespace llvm; |
| using namespace strings; |
| using std::unique_ptr; |
| |
| PhjBuilder::PhjBuilder(int join_node_id, TJoinOp::type join_op, |
| const RowDescriptor& probe_row_desc, const RowDescriptor& build_row_desc, |
| RuntimeState* state) |
| : DataSink(build_row_desc), |
| runtime_state_(state), |
| join_node_id_(join_node_id), |
| join_op_(join_op), |
| probe_row_desc_(probe_row_desc), |
| block_mgr_client_(NULL), |
| non_empty_build_(false), |
| partitions_created_(NULL), |
| largest_partition_percent_(NULL), |
| max_partition_level_(NULL), |
| num_build_rows_partitioned_(NULL), |
| num_hash_collisions_(NULL), |
| num_hash_buckets_(NULL), |
| num_spilled_partitions_(NULL), |
| num_repartitions_(NULL), |
| partition_build_rows_timer_(NULL), |
| build_hash_table_timer_(NULL), |
| repartition_timer_(NULL), |
| null_aware_partition_(NULL), |
| process_build_batch_fn_(NULL), |
| process_build_batch_fn_level0_(NULL), |
| insert_batch_fn_(NULL), |
| insert_batch_fn_level0_(NULL) {} |
| |
| Status PhjBuilder::Init(RuntimeState* state, |
| const vector<TEqJoinCondition>& eq_join_conjuncts, |
| const vector<TRuntimeFilterDesc>& filters) { |
| for (const TEqJoinCondition& eq_join_conjunct : eq_join_conjuncts) { |
| ExprContext* ctx; |
| RETURN_IF_ERROR(Expr::CreateExprTree(&pool_, eq_join_conjunct.right, &ctx)); |
| build_expr_ctxs_.push_back(ctx); |
| is_not_distinct_from_.push_back(eq_join_conjunct.is_not_distinct_from); |
| } |
| |
| for (const TRuntimeFilterDesc& filter : filters) { |
| // If filter propagation not enabled, only consider building broadcast joins (that may |
| // be consumed by this fragment). |
| if (state->query_options().runtime_filter_mode != TRuntimeFilterMode::GLOBAL |
| && !filter.is_broadcast_join) { |
| continue; |
| } |
| if (state->query_options().disable_row_runtime_filtering |
| && !filter.applied_on_partition_columns) { |
| continue; |
| } |
| FilterContext filter_ctx; |
| filter_ctx.filter = state->filter_bank()->RegisterFilter(filter, true); |
| RETURN_IF_ERROR(Expr::CreateExprTree(&pool_, filter.src_expr, &filter_ctx.expr_ctx)); |
| filters_.push_back(filter_ctx); |
| } |
| return Status::OK(); |
| } |
| |
| string PhjBuilder::GetName() { |
| return Substitute("Hash Join Builder (join_node_id=$0)", join_node_id_); |
| } |
| |
| Status PhjBuilder::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) { |
| RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker)); |
| RETURN_IF_ERROR( |
| Expr::Prepare(build_expr_ctxs_, state, row_desc_, expr_mem_tracker_.get())); |
| expr_ctxs_to_free_.insert( |
| expr_ctxs_to_free_.end(), build_expr_ctxs_.begin(), build_expr_ctxs_.end()); |
| |
| for (const FilterContext& ctx : filters_) { |
| RETURN_IF_ERROR(ctx.expr_ctx->Prepare(state, row_desc_, expr_mem_tracker_.get())); |
| expr_ctxs_to_free_.push_back(ctx.expr_ctx); |
| } |
| RETURN_IF_ERROR(HashTableCtx::Create(state, build_expr_ctxs_, build_expr_ctxs_, |
| HashTableStoresNulls(), is_not_distinct_from_, state->fragment_hash_seed(), |
| MAX_PARTITION_DEPTH, row_desc_.tuple_descriptors().size(), mem_tracker_.get(), |
| &ht_ctx_)); |
| RETURN_IF_ERROR(state->block_mgr()->RegisterClient( |
| Substitute("PartitionedHashJoin id=$0 builder=$1", join_node_id_, this), |
| MinRequiredBuffers(), true, mem_tracker_.get(), state, &block_mgr_client_)); |
| |
| partitions_created_ = ADD_COUNTER(profile(), "PartitionsCreated", TUnit::UNIT); |
| largest_partition_percent_ = |
| profile()->AddHighWaterMarkCounter("LargestPartitionPercent", TUnit::UNIT); |
| max_partition_level_ = |
| profile()->AddHighWaterMarkCounter("MaxPartitionLevel", TUnit::UNIT); |
| num_build_rows_partitioned_ = |
| ADD_COUNTER(profile(), "BuildRowsPartitioned", TUnit::UNIT); |
| num_hash_collisions_ = ADD_COUNTER(profile(), "HashCollisions", TUnit::UNIT); |
| num_hash_buckets_ = ADD_COUNTER(profile(), "HashBuckets", TUnit::UNIT); |
| num_spilled_partitions_ = ADD_COUNTER(profile(), "SpilledPartitions", TUnit::UNIT); |
| num_repartitions_ = ADD_COUNTER(profile(), "NumRepartitions", TUnit::UNIT); |
| partition_build_rows_timer_ = ADD_TIMER(profile(), "BuildRowsPartitionTime"); |
| build_hash_table_timer_ = ADD_TIMER(profile(), "HashTablesBuildTime"); |
| repartition_timer_ = ADD_TIMER(profile(), "RepartitionTime"); |
| if (state->CodegenDisabledByQueryOption()) { |
| profile()->AddCodegenMsg(false, "disabled by query option DISABLE_CODEGEN"); |
| } else if (state->CodegenDisabledByHint()) { |
| profile()->AddCodegenMsg(false, "disabled due to optimization hints"); |
| } |
| return Status::OK(); |
| } |
| |
| Status PhjBuilder::Open(RuntimeState* state) { |
| RETURN_IF_ERROR(Expr::Open(build_expr_ctxs_, state)); |
| for (const FilterContext& filter : filters_) { |
| RETURN_IF_ERROR(filter.expr_ctx->Open(state)); |
| } |
| RETURN_IF_ERROR(CreateHashPartitions(0)); |
| AllocateRuntimeFilters(); |
| |
| if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { |
| RETURN_IF_ERROR(CreateAndPreparePartition(0, &null_aware_partition_)); |
| } |
| return Status::OK(); |
| } |
| |
| Status PhjBuilder::Send(RuntimeState* state, RowBatch* batch) { |
| SCOPED_TIMER(partition_build_rows_timer_); |
| bool build_filters = ht_ctx_->level() == 0 && filters_.size() > 0; |
| if (process_build_batch_fn_ == NULL) { |
| RETURN_IF_ERROR(ProcessBuildBatch(batch, ht_ctx_.get(), build_filters)); |
| } else { |
| DCHECK(process_build_batch_fn_level0_ != NULL); |
| if (ht_ctx_->level() == 0) { |
| RETURN_IF_ERROR( |
| process_build_batch_fn_level0_(this, batch, ht_ctx_.get(), build_filters)); |
| } else { |
| RETURN_IF_ERROR(process_build_batch_fn_(this, batch, ht_ctx_.get(), build_filters)); |
| } |
| } |
| |
| // Free any local allocations made during partitioning. |
| ExprContext::FreeLocalAllocations(expr_ctxs_to_free_); |
| COUNTER_ADD(num_build_rows_partitioned_, batch->num_rows()); |
| return Status::OK(); |
| } |
| |
| Status PhjBuilder::FlushFinal(RuntimeState* state) { |
| int64_t num_build_rows = 0; |
| for (Partition* partition : hash_partitions_) { |
| num_build_rows += partition->build_rows()->num_rows(); |
| } |
| |
| if (num_build_rows > 0) { |
| double largest_fraction = 0.0; |
| for (Partition* partition : hash_partitions_) { |
| largest_fraction = max(largest_fraction, |
| partition->build_rows()->num_rows() / static_cast<double>(num_build_rows)); |
| } |
| COUNTER_SET(largest_partition_percent_, static_cast<int64_t>(largest_fraction * 100)); |
| } |
| |
| if (VLOG_IS_ON(2)) { |
| stringstream ss; |
| ss << Substitute("PHJ(node_id=$0) partitioned(level=$1) $2 rows into:", join_node_id_, |
| hash_partitions_[0]->level(), num_build_rows); |
| for (int i = 0; i < hash_partitions_.size(); ++i) { |
| Partition* partition = hash_partitions_[i]; |
| double percent = num_build_rows == 0 ? 0.0 : partition->build_rows()->num_rows() |
| * 100 / static_cast<double>(num_build_rows); |
| ss << " " << i << " " << (partition->is_spilled() ? "spilled" : "not spilled") |
| << " (fraction=" << fixed << setprecision(2) << percent << "%)" << endl |
| << " #rows:" << partition->build_rows()->num_rows() << endl; |
| } |
| VLOG(2) << ss.str(); |
| } |
| |
| if (ht_ctx_->level() == 0) { |
| PublishRuntimeFilters(num_build_rows); |
| non_empty_build_ |= (num_build_rows > 0); |
| } |
| |
| RETURN_IF_ERROR(BuildHashTablesAndPrepareProbeStreams()); |
| return Status::OK(); |
| } |
| |
| void PhjBuilder::Close(RuntimeState* state) { |
| if (closed_) return; |
| ExprContext::FreeLocalAllocations(expr_ctxs_to_free_); |
| CloseAndDeletePartitions(); |
| if (ht_ctx_ != NULL) ht_ctx_->Close(); |
| Expr::Close(build_expr_ctxs_, state); |
| for (const FilterContext& ctx : filters_) ctx.expr_ctx->Close(state); |
| if (block_mgr_client_ != NULL) state->block_mgr()->ClearReservations(block_mgr_client_); |
| pool_.Clear(); |
| DataSink::Close(state); |
| closed_ = true; |
| } |
| |
| void PhjBuilder::Reset() { |
| ExprContext::FreeLocalAllocations(expr_ctxs_to_free_); |
| non_empty_build_ = false; |
| CloseAndDeletePartitions(); |
| } |
| |
| Status PhjBuilder::CreateAndPreparePartition(int level, Partition** partition) { |
| all_partitions_.emplace_back(new Partition(runtime_state_, this, level)); |
| *partition = all_partitions_.back().get(); |
| RETURN_IF_ERROR((*partition)->build_rows()->Init(join_node_id_, profile(), true)); |
| bool got_buffer; |
| RETURN_IF_ERROR((*partition)->build_rows()->PrepareForWrite(&got_buffer)); |
| if (!got_buffer) { |
| return runtime_state_->block_mgr()->MemLimitTooLowError( |
| block_mgr_client_, join_node_id_); |
| } |
| return Status::OK(); |
| } |
| |
| Status PhjBuilder::CreateHashPartitions(int level) { |
| DCHECK(hash_partitions_.empty()); |
| ht_ctx_->set_level(level); // Set the hash function for partitioning input. |
| for (int i = 0; i < PARTITION_FANOUT; ++i) { |
| Partition* new_partition; |
| RETURN_IF_ERROR(CreateAndPreparePartition(level, &new_partition)); |
| hash_partitions_.push_back(new_partition); |
| } |
| COUNTER_ADD(partitions_created_, PARTITION_FANOUT); |
| COUNTER_SET(max_partition_level_, level); |
| return Status::OK(); |
| } |
| |
| bool PhjBuilder::AppendRowStreamFull( |
| BufferedTupleStream* stream, TupleRow* row, Status* status) noexcept { |
| while (true) { |
| // Check if the stream is still using small buffers and try to switch to IO-buffers. |
| if (stream->using_small_buffers()) { |
| bool got_buffer; |
| *status = stream->SwitchToIoBuffers(&got_buffer); |
| if (!status->ok()) return false; |
| |
| if (got_buffer) { |
| if (LIKELY(stream->AddRow(row, status))) return true; |
| if (!status->ok()) return false; |
| } |
| } |
| // We ran out of memory. Pick a partition to spill. If we ran out of unspilled |
| // partitions, SpillPartition() will return an error status. |
| *status = SpillPartition(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT); |
| if (!status->ok()) return false; |
| if (stream->AddRow(row, status)) return true; |
| if (!status->ok()) return false; |
| // Spilling one partition does not guarantee we can append a row. Keep |
| // spilling until we can append this row. |
| } |
| } |
| |
| // TODO: can we do better with a different spilling heuristic? |
| Status PhjBuilder::SpillPartition(BufferedTupleStream::UnpinMode mode) { |
| DCHECK_EQ(hash_partitions_.size(), PARTITION_FANOUT); |
| int64_t max_freed_mem = 0; |
| int partition_idx = -1; |
| |
| // Iterate over the partitions and pick the largest partition to spill. |
| for (int i = 0; i < PARTITION_FANOUT; ++i) { |
| Partition* candidate = hash_partitions_[i]; |
| if (candidate->IsClosed()) continue; |
| if (candidate->is_spilled()) continue; |
| int64_t mem = candidate->build_rows()->bytes_in_mem(false); |
| if (candidate->hash_tbl() != NULL) { |
| // The hash table should not have matches, since we have not probed it yet. |
| // Losing match info would lead to incorrect results (IMPALA-1488). |
| DCHECK(!candidate->hash_tbl()->HasMatches()); |
| mem += candidate->hash_tbl()->ByteSize(); |
| } |
| if (mem > max_freed_mem) { |
| max_freed_mem = mem; |
| partition_idx = i; |
| } |
| } |
| |
| if (partition_idx == -1) { |
| // Could not find a partition to spill. This means the mem limit was just too low. |
| return runtime_state_->block_mgr()->MemLimitTooLowError( |
| block_mgr_client_, join_node_id_); |
| } |
| |
| VLOG(2) << "Spilling partition: " << partition_idx << endl << DebugString(); |
| Partition* build_partition = hash_partitions_[partition_idx]; |
| RETURN_IF_ERROR(build_partition->Spill(mode)); |
| return Status::OK(); |
| } |
| |
| // When this function is called, we've finished processing the current build input |
| // (either from the child ExecNode or from repartitioning a spilled partition). The build |
| // rows have only been partitioned, we still need to build hash tables over them. Some |
| // of the partitions could have already been spilled and attempting to build hash |
| // tables over the non-spilled ones can cause them to spill. |
| // |
| // At the end of the function all partitions either have a hash table (and therefore are |
| // not spilled) or are spilled. Partitions that have hash tables don't need to spill on |
| // the probe side. |
| // |
| // This maps perfectly to a 0-1 knapsack where the weight is the memory to keep the |
| // build rows and hash table and the value is the expected IO savings. |
| // For now, we go with a greedy solution. |
| // |
| // TODO: implement the knapsack solution. |
| Status PhjBuilder::BuildHashTablesAndPrepareProbeStreams() { |
| DCHECK_EQ(PARTITION_FANOUT, hash_partitions_.size()); |
| |
| for (int i = 0; i < PARTITION_FANOUT; ++i) { |
| Partition* partition = hash_partitions_[i]; |
| if (partition->build_rows()->num_rows() == 0) { |
| // This partition is empty, no need to do anything else. |
| partition->Close(NULL); |
| } else if (partition->is_spilled()) { |
| // We don't need any build-side data for spilled partitions in memory. |
| RETURN_IF_ERROR( |
| partition->build_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL)); |
| } |
| } |
| |
| // Allocate probe buffers for all partitions that are already spilled. Do this before |
| // building hash tables because allocating probe buffers may cause some more partitions |
| // to be spilled. This avoids wasted work on building hash tables for partitions that |
| // won't fit in memory alongside the required probe buffers. |
| RETURN_IF_ERROR(InitSpilledPartitionProbeStreams()); |
| |
| for (int i = 0; i < PARTITION_FANOUT; ++i) { |
| Partition* partition = hash_partitions_[i]; |
| if (partition->IsClosed() || partition->is_spilled()) continue; |
| |
| bool built = false; |
| DCHECK(partition->build_rows()->is_pinned()); |
| RETURN_IF_ERROR(partition->BuildHashTable(&built)); |
| // If we did not have enough memory to build this hash table, we need to spill this |
| // partition (clean up the hash table, unpin build). |
| if (!built) RETURN_IF_ERROR(partition->Spill(BufferedTupleStream::UNPIN_ALL)); |
| } |
| |
| // We may have spilled additional partitions while building hash tables, we need to |
| // initialize probe buffers for them. |
| // TODO: once we have reliable reservations (IMPALA-3200) this should no longer be |
| // necessary: we will know exactly how many partitions will fit in memory and we can |
| // avoid building then immediately destroying hash tables. |
| RETURN_IF_ERROR(InitSpilledPartitionProbeStreams()); |
| |
| // TODO: at this point we could have freed enough memory to pin and build some |
| // spilled partitions. This can happen, for example if there is a lot of skew. |
| // Partition 1: 10GB (pinned initially). |
| // Partition 2,3,4: 1GB (spilled during partitioning the build). |
| // In the previous step, we could have unpinned 10GB (because there was not enough |
| // memory to build a hash table over it) which can now free enough memory to |
| // build hash tables over the remaining 3 partitions. |
| // We start by spilling the largest partition though so the build input would have |
| // to be pretty pathological. |
| // Investigate if this is worthwhile. |
| return Status::OK(); |
| } |
| |
| Status PhjBuilder::InitSpilledPartitionProbeStreams() { |
| DCHECK_EQ(PARTITION_FANOUT, hash_partitions_.size()); |
| |
| int num_spilled_partitions = 0; |
| for (int i = 0; i < PARTITION_FANOUT; ++i) { |
| Partition* partition = hash_partitions_[i]; |
| if (!partition->IsClosed() && partition->is_spilled()) ++num_spilled_partitions; |
| } |
| int probe_streams_to_create = |
| num_spilled_partitions - spilled_partition_probe_streams_.size(); |
| |
| while (probe_streams_to_create > 0) { |
| // Create stream in vector, so that it will be cleaned up after any failure. |
| spilled_partition_probe_streams_.emplace_back(std::make_unique<BufferedTupleStream>( |
| runtime_state_, probe_row_desc_, runtime_state_->block_mgr(), block_mgr_client_, |
| false /* use_initial_small_buffers */, false /* read_write */)); |
| BufferedTupleStream* probe_stream = spilled_partition_probe_streams_.back().get(); |
| RETURN_IF_ERROR(probe_stream->Init(join_node_id_, profile(), false)); |
| |
| // Loop until either the stream gets a buffer or all partitions are spilled (in which |
| // case SpillPartition() returns an error). |
| while (true) { |
| bool got_buffer; |
| RETURN_IF_ERROR(probe_stream->PrepareForWrite(&got_buffer)); |
| if (got_buffer) break; |
| |
| RETURN_IF_ERROR(SpillPartition(BufferedTupleStream::UNPIN_ALL)); |
| ++probe_streams_to_create; |
| } |
| --probe_streams_to_create; |
| } |
| return Status::OK(); |
| } |
| |
| vector<unique_ptr<BufferedTupleStream>> PhjBuilder::TransferProbeStreams() { |
| return std::move(spilled_partition_probe_streams_); |
| } |
| |
| void PhjBuilder::CloseAndDeletePartitions() { |
| // Close all the partitions and clean up all references to them. |
| for (unique_ptr<Partition>& partition : all_partitions_) partition->Close(NULL); |
| all_partitions_.clear(); |
| hash_partitions_.clear(); |
| null_aware_partition_ = NULL; |
| for (unique_ptr<BufferedTupleStream>& stream : spilled_partition_probe_streams_) { |
| stream->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES); |
| } |
| spilled_partition_probe_streams_.clear(); |
| } |
| |
| void PhjBuilder::AllocateRuntimeFilters() { |
| DCHECK(join_op_ != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || filters_.size() == 0) |
| << "Runtime filters not supported with NULL_AWARE_LEFT_ANTI_JOIN"; |
| DCHECK(ht_ctx_ != NULL); |
| for (int i = 0; i < filters_.size(); ++i) { |
| filters_[i].local_bloom_filter = |
| runtime_state_->filter_bank()->AllocateScratchBloomFilter( |
| filters_[i].filter->id()); |
| } |
| } |
| |
| void PhjBuilder::PublishRuntimeFilters(int64_t num_build_rows) { |
| int32_t num_enabled_filters = 0; |
| // Use 'num_build_rows' to estimate FP-rate of each Bloom filter, and publish |
| // 'always-true' filters if it's too high. Doing so saves CPU at the coordinator, |
| // serialisation time, and reduces the cost of applying the filter at the scan - most |
| // significantly for per-row filters. However, the number of build rows could be a very |
| // poor estimate of the NDV - particularly if the filter expression is a function of |
| // several columns. |
| // TODO: Better heuristic. |
| for (const FilterContext& ctx : filters_) { |
| // TODO: Consider checking actual number of bits set in filter to compute FP rate. |
| // TODO: Consider checking this every few batches or so. |
| bool fp_rate_too_high = runtime_state_->filter_bank()->FpRateTooHigh( |
| ctx.filter->filter_size(), num_build_rows); |
| runtime_state_->filter_bank()->UpdateFilterFromLocal(ctx.filter->id(), |
| fp_rate_too_high ? BloomFilter::ALWAYS_TRUE_FILTER : ctx.local_bloom_filter); |
| |
| num_enabled_filters += !fp_rate_too_high; |
| } |
| |
| if (filters_.size() > 0) { |
| string info_string; |
| if (num_enabled_filters == filters_.size()) { |
| info_string = Substitute("$0 of $0 Runtime Filter$1 Published", filters_.size(), |
| filters_.size() == 1 ? "" : "s"); |
| } else { |
| info_string = Substitute("$0 of $1 Runtime Filter$2 Published, $3 Disabled", |
| num_enabled_filters, filters_.size(), filters_.size() == 1 ? "" : "s", |
| filters_.size() - num_enabled_filters); |
| } |
| profile()->AddInfoString("Runtime filters", info_string); |
| } |
| } |
| |
| Status PhjBuilder::RepartitionBuildInput( |
| Partition* input_partition, int level, BufferedTupleStream* input_probe_rows) { |
| DCHECK_GE(level, 1); |
| SCOPED_TIMER(repartition_timer_); |
| COUNTER_ADD(num_repartitions_, 1); |
| RuntimeState* state = runtime_state_; |
| |
| // Setup the read buffer and the new partitions. |
| BufferedTupleStream* build_rows = input_partition->build_rows(); |
| DCHECK(build_rows != NULL); |
| bool got_read_buffer; |
| RETURN_IF_ERROR(build_rows->PrepareForRead(true, &got_read_buffer)); |
| if (!got_read_buffer) { |
| return mem_tracker()->MemLimitExceeded( |
| state, Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, join_node_id_)); |
| } |
| RETURN_IF_ERROR(CreateHashPartitions(level)); |
| |
| // Repartition 'input_stream' into 'hash_partitions_'. |
| RowBatch build_batch(row_desc_, state->batch_size(), mem_tracker()); |
| bool eos = false; |
| while (!eos) { |
| RETURN_IF_CANCELLED(state); |
| RETURN_IF_ERROR(state->CheckQueryState()); |
| |
| RETURN_IF_ERROR(build_rows->GetNext(&build_batch, &eos)); |
| RETURN_IF_ERROR(Send(state, &build_batch)); |
| build_batch.Reset(); |
| } |
| |
| // Done reading the input, we can safely close it now to free memory. |
| input_partition->Close(NULL); |
| |
| // We just freed up the buffer used for reading build rows. Ensure a buffer is |
| // allocated for reading probe rows before we build the hash tables in FlushFinal(). |
| // TODO: once we have reliable reservations (IMPALA-3200) we can just hand off the |
| // reservation and avoid this complication. |
| while (true) { |
| bool got_buffer; |
| input_probe_rows->PrepareForRead(true, &got_buffer); |
| if (got_buffer) break; |
| RETURN_IF_ERROR(SpillPartition(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT)); |
| } |
| |
| RETURN_IF_ERROR(FlushFinal(state)); |
| return Status::OK(); |
| } |
| |
| int64_t PhjBuilder::LargestPartitionRows() const { |
| int64_t max_rows = 0; |
| for (int i = 0; i < hash_partitions_.size(); ++i) { |
| Partition* partition = hash_partitions_[i]; |
| DCHECK(partition != NULL); |
| if (partition->IsClosed()) continue; |
| int64_t rows = partition->build_rows()->num_rows(); |
| if (rows > max_rows) max_rows = rows; |
| } |
| return max_rows; |
| } |
| |
| bool PhjBuilder::HashTableStoresNulls() const { |
| return join_op_ == TJoinOp::RIGHT_OUTER_JOIN || join_op_ == TJoinOp::RIGHT_ANTI_JOIN |
| || join_op_ == TJoinOp::FULL_OUTER_JOIN |
| || std::accumulate(is_not_distinct_from_.begin(), is_not_distinct_from_.end(), |
| false, std::logical_or<bool>()); |
| } |
| |
| PhjBuilder::Partition::Partition(RuntimeState* state, PhjBuilder* parent, int level) |
| : parent_(parent), is_spilled_(false), level_(level) { |
| // If we're repartitioning, we can assume the build input is fairly large and small |
| // buffers will most likely just waste memory. |
| bool use_initial_small_buffers = level == 0; |
| build_rows_ = |
| std::make_unique<BufferedTupleStream>(state, parent_->row_desc_, state->block_mgr(), |
| parent_->block_mgr_client_, use_initial_small_buffers, false /* read_write */); |
| } |
| |
| PhjBuilder::Partition::~Partition() { |
| DCHECK(IsClosed()); |
| } |
| |
| int64_t PhjBuilder::Partition::EstimatedInMemSize() const { |
| return build_rows_->byte_size() + HashTable::EstimateSize(build_rows_->num_rows()); |
| } |
| |
| void PhjBuilder::Partition::Close(RowBatch* batch) { |
| if (IsClosed()) return; |
| |
| if (hash_tbl_ != NULL) { |
| COUNTER_ADD(parent_->num_hash_collisions_, hash_tbl_->NumHashCollisions()); |
| hash_tbl_->Close(); |
| } |
| |
| // Transfer ownership of 'build_rows_' memory to 'batch' if 'batch' is not NULL. |
| // Flush out the resources to free up memory for subsequent partitions. |
| if (build_rows_ != NULL) { |
| build_rows_->Close(batch, RowBatch::FlushMode::FLUSH_RESOURCES); |
| build_rows_.reset(); |
| } |
| } |
| |
| Status PhjBuilder::Partition::Spill(BufferedTupleStream::UnpinMode mode) { |
| DCHECK(!IsClosed()); |
| // Close the hash table as soon as possible to release memory. |
| if (hash_tbl() != NULL) { |
| hash_tbl_->Close(); |
| hash_tbl_.reset(); |
| } |
| |
| // Unpin the stream as soon as possible to increase the chances that the |
| // SwitchToIoBuffers() call below will succeed. |
| RETURN_IF_ERROR(build_rows_->UnpinStream(mode)); |
| |
| if (build_rows_->using_small_buffers()) { |
| bool got_buffer; |
| RETURN_IF_ERROR(build_rows_->SwitchToIoBuffers(&got_buffer)); |
| if (!got_buffer) { |
| // We'll try again to get the buffers when the stream fills up the small buffers. |
| VLOG_QUERY << "Not enough memory to switch to IO-sized buffer for partition " |
| << this << " of join=" << parent_->join_node_id_ |
| << " build small buffers=" << build_rows_->using_small_buffers(); |
| VLOG_FILE << GetStackTrace(); |
| } |
| } |
| |
| if (!is_spilled_) { |
| COUNTER_ADD(parent_->num_spilled_partitions_, 1); |
| if (parent_->num_spilled_partitions_->value() == 1) { |
| parent_->profile()->AppendExecOption("Spilled"); |
| } |
| } |
| is_spilled_ = true; |
| return Status::OK(); |
| } |
| |
| Status PhjBuilder::Partition::BuildHashTable(bool* built) { |
| SCOPED_TIMER(parent_->build_hash_table_timer_); |
| DCHECK(build_rows_ != NULL); |
| *built = false; |
| |
| // Before building the hash table, we need to pin the rows in memory. |
| RETURN_IF_ERROR(build_rows_->PinStream(false, built)); |
| if (!*built) return Status::OK(); |
| |
| RuntimeState* state = parent_->runtime_state_; |
| HashTableCtx* ctx = parent_->ht_ctx_.get(); |
| ctx->set_level(level()); // Set the hash function for building the hash table. |
| RowBatch batch(parent_->row_desc_, state->batch_size(), parent_->mem_tracker()); |
| vector<BufferedTupleStream::RowIdx> indices; |
| bool eos = false; |
| |
| // Allocate the partition-local hash table. Initialize the number of buckets based on |
| // the number of build rows (the number of rows is known at this point). This assumes |
| // there are no duplicates which can be wrong. However, the upside in the common case |
| // (few/no duplicates) is large and the downside when there are is low (a bit more |
| // memory; the bucket memory is small compared to the memory needed for all the build |
| // side allocations). |
| // One corner case is if the stream contains tuples with zero footprint (no materialized |
| // slots). If the tuples occupy no space, this implies all rows will be duplicates, so |
| // create a small hash table, IMPALA-2256. |
| // |
| // TODO: Try to allocate the hash table before pinning the stream to avoid needlessly |
| // reading all of the spilled rows from disk when we won't succeed anyway. |
| int64_t estimated_num_buckets = build_rows()->RowConsumesMemory() ? |
| HashTable::EstimateNumBuckets(build_rows()->num_rows()) : |
| state->batch_size() * 2; |
| hash_tbl_.reset(HashTable::Create(state, parent_->block_mgr_client_, |
| true /* store_duplicates */, parent_->row_desc_.tuple_descriptors().size(), |
| build_rows(), 1 << (32 - NUM_PARTITIONING_BITS), estimated_num_buckets)); |
| if (!hash_tbl_->Init()) goto not_built; |
| |
| bool got_read_buffer; |
| RETURN_IF_ERROR(build_rows_->PrepareForRead(false, &got_read_buffer)); |
| DCHECK(got_read_buffer) << "Stream was already pinned."; |
| do { |
| RETURN_IF_ERROR(build_rows_->GetNext(&batch, &eos, &indices)); |
| DCHECK_EQ(batch.num_rows(), indices.size()); |
| DCHECK_LE(batch.num_rows(), hash_tbl_->EmptyBuckets()) |
| << build_rows()->RowConsumesMemory(); |
| TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode; |
| if (parent_->insert_batch_fn_ != NULL) { |
| InsertBatchFn insert_batch_fn; |
| if (level() == 0) { |
| insert_batch_fn = parent_->insert_batch_fn_level0_; |
| } else { |
| insert_batch_fn = parent_->insert_batch_fn_; |
| } |
| DCHECK(insert_batch_fn != NULL); |
| if (UNLIKELY(!insert_batch_fn(this, prefetch_mode, ctx, &batch, indices))) { |
| goto not_built; |
| } |
| } else { |
| if (UNLIKELY(!InsertBatch(prefetch_mode, ctx, &batch, indices))) goto not_built; |
| } |
| RETURN_IF_ERROR(state->GetQueryStatus()); |
| // Free any local allocations made while inserting. |
| ExprContext::FreeLocalAllocations(parent_->expr_ctxs_to_free_); |
| batch.Reset(); |
| } while (!eos); |
| |
| // The hash table fits in memory and is built. |
| DCHECK(*built); |
| DCHECK(hash_tbl_ != NULL); |
| is_spilled_ = false; |
| COUNTER_ADD(parent_->num_hash_buckets_, hash_tbl_->num_buckets()); |
| return Status::OK(); |
| |
| not_built: |
| *built = false; |
| if (hash_tbl_ != NULL) { |
| hash_tbl_->Close(); |
| hash_tbl_.reset(); |
| } |
| return Status::OK(); |
| } |
| |
| void PhjBuilder::Codegen(LlvmCodeGen* codegen) { |
| Status build_codegen_status; |
| Status insert_codegen_status; |
| Status codegen_status; |
| |
| // Codegen for hashing rows with the builder's hash table context. |
| Function* hash_fn; |
| codegen_status = ht_ctx_->CodegenHashRow(codegen, false, &hash_fn); |
| Function* murmur_hash_fn; |
| codegen_status.MergeStatus(ht_ctx_->CodegenHashRow(codegen, true, &murmur_hash_fn)); |
| |
| // Codegen for evaluating build rows |
| Function* eval_build_row_fn; |
| codegen_status.MergeStatus(ht_ctx_->CodegenEvalRow(codegen, true, &eval_build_row_fn)); |
| |
| if (codegen_status.ok()) { |
| TPrefetchMode::type prefetch_mode = runtime_state_->query_options().prefetch_mode; |
| build_codegen_status = |
| CodegenProcessBuildBatch(codegen, hash_fn, murmur_hash_fn, eval_build_row_fn); |
| insert_codegen_status = CodegenInsertBatch(codegen, hash_fn, murmur_hash_fn, |
| eval_build_row_fn, prefetch_mode); |
| } else { |
| build_codegen_status = codegen_status; |
| insert_codegen_status = codegen_status; |
| } |
| profile()->AddCodegenMsg(build_codegen_status.ok(), build_codegen_status, "Build Side"); |
| profile()->AddCodegenMsg(insert_codegen_status.ok(), insert_codegen_status, |
| "Hash Table Construction"); |
| } |
| |
| string PhjBuilder::DebugString() const { |
| stringstream ss; |
| ss << "Hash partitions: " << hash_partitions_.size() << ":" << endl; |
| for (int i = 0; i < hash_partitions_.size(); ++i) { |
| Partition* partition = hash_partitions_[i]; |
| ss << " Hash partition " << i << " ptr=" << partition; |
| if (partition->IsClosed()) { |
| ss << " Closed"; |
| continue; |
| } |
| if (partition->is_spilled()) { |
| ss << " Spilled"; |
| } |
| DCHECK(partition->build_rows() != NULL); |
| ss << endl |
| << " Build Rows: " << partition->build_rows()->num_rows() |
| << " (Blocks pinned: " << partition->build_rows()->blocks_pinned() << ")" << endl; |
| if (partition->hash_tbl() != NULL) { |
| ss << " Hash Table Rows: " << partition->hash_tbl()->size() << endl; |
| } |
| } |
| return ss.str(); |
| } |
| |
| Status PhjBuilder::CodegenProcessBuildBatch(LlvmCodeGen* codegen, |
| Function* hash_fn, Function* murmur_hash_fn, Function* eval_row_fn) { |
| Function* process_build_batch_fn = |
| codegen->GetFunction(IRFunction::PHJ_PROCESS_BUILD_BATCH, true); |
| DCHECK(process_build_batch_fn != NULL); |
| |
| // Replace call sites |
| int replaced = |
| codegen->ReplaceCallSites(process_build_batch_fn, eval_row_fn, "EvalBuildRow"); |
| DCHECK_EQ(replaced, 1); |
| |
| // Replace some hash table parameters with constants. |
| HashTableCtx::HashTableReplacedConstants replaced_constants; |
| const bool stores_duplicates = true; |
| const int num_build_tuples = row_desc_.tuple_descriptors().size(); |
| RETURN_IF_ERROR(ht_ctx_->ReplaceHashTableConstants(codegen, stores_duplicates, |
| num_build_tuples, process_build_batch_fn, &replaced_constants)); |
| DCHECK_GE(replaced_constants.stores_nulls, 1); |
| DCHECK_EQ(replaced_constants.finds_some_nulls, 0); |
| DCHECK_EQ(replaced_constants.stores_duplicates, 0); |
| DCHECK_EQ(replaced_constants.stores_tuples, 0); |
| DCHECK_EQ(replaced_constants.quadratic_probing, 0); |
| |
| Function* process_build_batch_fn_level0 = |
| codegen->CloneFunction(process_build_batch_fn); |
| |
| // Always build runtime filters at level0 (if there are any). |
| // Note that the first argument of this function is the return value. |
| Value* build_filters_l0_arg = codegen->GetArgument(process_build_batch_fn_level0, 4); |
| build_filters_l0_arg->replaceAllUsesWith( |
| ConstantInt::get(Type::getInt1Ty(codegen->context()), filters_.size() > 0)); |
| |
| // process_build_batch_fn_level0 uses CRC hash if available, |
| replaced = |
| codegen->ReplaceCallSites(process_build_batch_fn_level0, hash_fn, "HashRow"); |
| DCHECK_EQ(replaced, 1); |
| |
| // process_build_batch_fn uses murmur |
| replaced = |
| codegen->ReplaceCallSites(process_build_batch_fn, murmur_hash_fn, "HashRow"); |
| DCHECK_EQ(replaced, 1); |
| |
| // Never build filters after repartitioning, as all rows have already been added to the |
| // filters during the level0 build. Note that the first argument of this function is the |
| // return value. |
| Value* build_filters_arg = codegen->GetArgument(process_build_batch_fn, 4); |
| build_filters_arg->replaceAllUsesWith( |
| ConstantInt::get(Type::getInt1Ty(codegen->context()), false)); |
| |
| // Finalize ProcessBuildBatch functions |
| process_build_batch_fn = codegen->FinalizeFunction(process_build_batch_fn); |
| if (process_build_batch_fn == NULL) { |
| return Status( |
| "Codegen'd PhjBuilder::ProcessBuildBatch() function " |
| "failed verification, see log"); |
| } |
| process_build_batch_fn_level0 = |
| codegen->FinalizeFunction(process_build_batch_fn_level0); |
| if (process_build_batch_fn == NULL) { |
| return Status( |
| "Codegen'd level-zero PhjBuilder::ProcessBuildBatch() " |
| "function failed verification, see log"); |
| } |
| |
| // Register native function pointers |
| codegen->AddFunctionToJit( |
| process_build_batch_fn, reinterpret_cast<void**>(&process_build_batch_fn_)); |
| codegen->AddFunctionToJit(process_build_batch_fn_level0, |
| reinterpret_cast<void**>(&process_build_batch_fn_level0_)); |
| return Status::OK(); |
| } |
| |
| Status PhjBuilder::CodegenInsertBatch(LlvmCodeGen* codegen, Function* hash_fn, |
| Function* murmur_hash_fn, Function* eval_row_fn, TPrefetchMode::type prefetch_mode) { |
| Function* insert_batch_fn = codegen->GetFunction(IRFunction::PHJ_INSERT_BATCH, true); |
| Function* build_equals_fn; |
| RETURN_IF_ERROR(ht_ctx_->CodegenEquals(codegen, true, &build_equals_fn)); |
| |
| // Replace the parameter 'prefetch_mode' with constant. |
| Value* prefetch_mode_arg = codegen->GetArgument(insert_batch_fn, 1); |
| DCHECK_GE(prefetch_mode, TPrefetchMode::NONE); |
| DCHECK_LE(prefetch_mode, TPrefetchMode::HT_BUCKET); |
| prefetch_mode_arg->replaceAllUsesWith( |
| ConstantInt::get(Type::getInt32Ty(codegen->context()), prefetch_mode)); |
| |
| // Use codegen'd EvalBuildRow() function |
| int replaced = codegen->ReplaceCallSites(insert_batch_fn, eval_row_fn, "EvalBuildRow"); |
| DCHECK_EQ(replaced, 1); |
| |
| // Use codegen'd Equals() function |
| replaced = codegen->ReplaceCallSites(insert_batch_fn, build_equals_fn, "Equals"); |
| DCHECK_EQ(replaced, 1); |
| |
| // Replace hash-table parameters with constants. |
| HashTableCtx::HashTableReplacedConstants replaced_constants; |
| const bool stores_duplicates = true; |
| const int num_build_tuples = row_desc_.tuple_descriptors().size(); |
| RETURN_IF_ERROR(ht_ctx_->ReplaceHashTableConstants(codegen, stores_duplicates, |
| num_build_tuples, insert_batch_fn, &replaced_constants)); |
| DCHECK_GE(replaced_constants.stores_nulls, 1); |
| DCHECK_EQ(replaced_constants.finds_some_nulls, 0); |
| DCHECK_GE(replaced_constants.stores_duplicates, 1); |
| DCHECK_GE(replaced_constants.stores_tuples, 1); |
| DCHECK_GE(replaced_constants.quadratic_probing, 1); |
| |
| Function* insert_batch_fn_level0 = codegen->CloneFunction(insert_batch_fn); |
| |
| // Use codegen'd hash functions |
| replaced = codegen->ReplaceCallSites(insert_batch_fn_level0, hash_fn, "HashRow"); |
| DCHECK_EQ(replaced, 1); |
| replaced = codegen->ReplaceCallSites(insert_batch_fn, murmur_hash_fn, "HashRow"); |
| DCHECK_EQ(replaced, 1); |
| |
| insert_batch_fn = codegen->FinalizeFunction(insert_batch_fn); |
| if (insert_batch_fn == NULL) { |
| return Status( |
| "PartitionedHashJoinNode::CodegenInsertBatch(): codegen'd " |
| "InsertBatch() function failed verification, see log"); |
| } |
| insert_batch_fn_level0 = codegen->FinalizeFunction(insert_batch_fn_level0); |
| if (insert_batch_fn_level0 == NULL) { |
| return Status( |
| "PartitionedHashJoinNode::CodegenInsertBatch(): codegen'd zero-level " |
| "InsertBatch() function failed verification, see log"); |
| } |
| |
| codegen->AddFunctionToJit(insert_batch_fn, reinterpret_cast<void**>(&insert_batch_fn_)); |
| codegen->AddFunctionToJit( |
| insert_batch_fn_level0, reinterpret_cast<void**>(&insert_batch_fn_level0_)); |
| return Status::OK(); |
| } |