| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "exec/partitioned-hash-join-builder.h" |
| |
| #include <iomanip> |
| #include <numeric> |
| |
| #include <gutil/strings/substitute.h> |
| |
| #include "codegen/llvm-codegen.h" |
| #include "exec/hash-table.inline.h" |
| #include "exprs/scalar-expr-evaluator.h" |
| #include "exprs/scalar-expr.h" |
| #include "runtime/buffered-tuple-stream.h" |
| #include "runtime/exec-env.h" |
| #include "runtime/fragment-state.h" |
| #include "runtime/mem-tracker.h" |
| #include "runtime/query-state.h" |
| #include "runtime/row-batch.h" |
| #include "runtime/runtime-filter-bank.h" |
| #include "runtime/runtime-filter.h" |
| #include "runtime/runtime-state.h" |
| #include "util/bloom-filter.h" |
| #include "util/cyclic-barrier.h" |
| #include "util/debug-util.h" |
| #include "util/min-max-filter.h" |
| #include "util/pretty-printer.h" |
| #include "util/runtime-profile-counters.h" |
| |
| #include "gen-cpp/PlanNodes_types.h" |
| |
| #include "common/names.h" |
| |
| static const string PREPARE_FOR_READ_FAILED_ERROR_MSG = |
| "Failed to acquire initial read " |
| "buffer for stream in hash join node $0. Reducing query concurrency or increasing " |
| "the memory limit may help this query to complete successfully."; |
| |
| using namespace impala; |
| using strings::Substitute; |
| |
| static string ConstructBuilderName(int join_node_id) { |
| return Substitute("Hash Join Builder (join_node_id=$0)", join_node_id); |
| } |
| |
| DataSink* PhjBuilderConfig::CreateSink(RuntimeState* state) const { |
| // We have one fragment per sink, so we can use the fragment index as the sink ID. |
| TDataSinkId sink_id = state->fragment().idx; |
| ObjectPool* pool = state->obj_pool(); |
| return pool->Add(new PhjBuilder(sink_id, *this, state)); |
| } |
| |
| PhjBuilder* PhjBuilderConfig::CreateSink(BufferPool::ClientHandle* buffer_pool_client, |
| int64_t spillable_buffer_size, int64_t max_row_buffer_size, |
| RuntimeState* state) const { |
| ObjectPool* pool = state->obj_pool(); |
| return pool->Add(new PhjBuilder(*this, buffer_pool_client, spillable_buffer_size, |
| max_row_buffer_size, state)); |
| } |
| |
| Status PhjBuilderConfig::CreateConfig(FragmentState* state, int join_node_id, |
| TJoinOp::type join_op, const RowDescriptor* build_row_desc, |
| const std::vector<TEqJoinCondition>& eq_join_conjuncts, |
| const std::vector<TRuntimeFilterDesc>& filters, uint32_t hash_seed, |
| PhjBuilderConfig** sink) { |
| ObjectPool* pool = state->obj_pool(); |
| TDataSink* tsink = pool->Add(new TDataSink()); |
| PhjBuilderConfig* data_sink = pool->Add(new PhjBuilderConfig()); |
| RETURN_IF_ERROR(data_sink->Init(state, join_node_id, join_op, build_row_desc, |
| eq_join_conjuncts, filters, hash_seed, tsink)); |
| *sink = data_sink; |
| return Status::OK(); |
| } |
| |
| void PhjBuilderConfig::Close() { |
| ScalarExpr::Close(build_exprs_); |
| ScalarExpr::Close(filter_exprs_); |
| DataSinkConfig::Close(); |
| } |
| |
| Status PhjBuilderConfig::InitExprsAndFilters(FragmentState* state, |
| const vector<TEqJoinCondition>& eq_join_conjuncts, |
| const vector<TRuntimeFilterDesc>& filter_descs) { |
| for (const TEqJoinCondition& eq_join_conjunct : eq_join_conjuncts) { |
| ScalarExpr* build_expr; |
| RETURN_IF_ERROR( |
| ScalarExpr::Create(eq_join_conjunct.right, *input_row_desc_, state, &build_expr)); |
| build_exprs_.push_back(build_expr); |
| is_not_distinct_from_.push_back(eq_join_conjunct.is_not_distinct_from); |
| } |
| |
| for (const TRuntimeFilterDesc& filter_desc : filter_descs) { |
| DCHECK(state->query_options().runtime_filter_mode == TRuntimeFilterMode::GLOBAL || |
| filter_desc.is_broadcast_join || state->query_options().num_nodes == 1); |
| DCHECK(!state->query_options().disable_row_runtime_filtering || |
| filter_desc.applied_on_partition_columns); |
| // Skip over filters that are not produced by the instances of the builder, i.e. |
| // broadcast filters where this instance was not selected as a filter producer. |
| const vector<const TPlanFragmentInstanceCtx*>& instance_ctxs = state->instance_ctxs(); |
| // We can pick any instance since the filters produced should be the same for all |
| // instances. |
| const vector<TRuntimeFilterSource>& filters_produced = |
| instance_ctxs[0]->filters_produced; |
| auto it = std::find_if(filters_produced.begin(), filters_produced.end(), |
| [this, &filter_desc](const TRuntimeFilterSource f) { |
| return f.src_node_id == join_node_id_ && f.filter_id == filter_desc.filter_id; |
| }); |
| if (it == filters_produced.end()) continue; |
| ScalarExpr* filter_expr; |
| RETURN_IF_ERROR( |
| ScalarExpr::Create(filter_desc.src_expr, *input_row_desc_, state, &filter_expr)); |
| filter_exprs_.push_back(filter_expr); |
| filter_descs_.push_back(filter_desc); |
| } |
| |
| hash_table_config_ = state->obj_pool()->Add(new HashTableConfig(build_exprs_, |
| build_exprs_, PhjBuilder::HashTableStoresNulls(join_op_, is_not_distinct_from_), |
| is_not_distinct_from_)); |
| state->CheckAndAddCodegenDisabledMessage(codegen_status_msgs_); |
| return Status::OK(); |
| } |
| |
| Status PhjBuilderConfig::Init(FragmentState* state, int join_node_id, |
| TJoinOp::type join_op, const RowDescriptor* build_row_desc, |
| const vector<TEqJoinCondition>& eq_join_conjuncts, |
| const vector<TRuntimeFilterDesc>& filters, uint32_t hash_seed, TDataSink* tsink) { |
| tsink->__isset.join_build_sink = true; |
| tsink->join_build_sink.__set_dest_node_id(join_node_id); |
| tsink->join_build_sink.__set_join_op(join_op); |
| RETURN_IF_ERROR(JoinBuilderConfig::Init(*tsink, build_row_desc, state)); |
| hash_seed_ = hash_seed; |
| return InitExprsAndFilters(state, eq_join_conjuncts, filters); |
| } |
| |
| Status PhjBuilderConfig::Init(const TDataSink& tsink, const RowDescriptor* input_row_desc, |
| FragmentState* state) { |
| RETURN_IF_ERROR(JoinBuilderConfig::Init(tsink, input_row_desc, state)); |
| const TJoinBuildSink& build_sink = tsink.join_build_sink; |
| hash_seed_ = build_sink.hash_seed; |
| resource_profile_ = &tsink.resource_profile; |
| return InitExprsAndFilters( |
| state, tsink.join_build_sink.eq_join_conjuncts, build_sink.runtime_filters); |
| } |
| |
| const char* PhjBuilder::LLVM_CLASS_NAME = "class.impala::PhjBuilder"; |
| |
| PhjBuilder::PhjBuilder( |
| TDataSinkId sink_id, const PhjBuilderConfig& sink_config, RuntimeState* state) |
| : JoinBuilder( |
| sink_id, sink_config, ConstructBuilderName(sink_config.join_node_id_), state), |
| runtime_state_(state), |
| hash_seed_(sink_config.hash_seed_), |
| resource_profile_(sink_config.resource_profile_), |
| reservation_manager_(), |
| buffer_pool_client_(reservation_manager_.buffer_pool_client()), |
| spillable_buffer_size_(resource_profile_->spillable_buffer_size), |
| max_row_buffer_size_(resource_profile_->max_row_buffer_size), |
| build_exprs_(sink_config.build_exprs_), |
| is_not_distinct_from_(sink_config.is_not_distinct_from_), |
| filter_exprs_(sink_config.filter_exprs_), |
| hash_table_config_(*sink_config.hash_table_config_), |
| probe_barrier_(num_probe_threads_ <= 1 ? |
| nullptr : |
| make_unique<CyclicBarrier>(state->instance_ctx().num_join_build_outputs)), |
| process_build_batch_fn_(sink_config.process_build_batch_fn_), |
| process_build_batch_fn_level0_(sink_config.process_build_batch_fn_level0_), |
| insert_batch_fn_(sink_config.insert_batch_fn_), |
| insert_batch_fn_level0_(sink_config.insert_batch_fn_level0_) { |
| DCHECK_GT(sink_config.hash_seed_, 0); |
| DCHECK(num_probe_threads_ <= 1 || !NeedToProcessUnmatchedBuildRows(join_op_)) |
| << "Returning rows with build partitions is not supported with shared builds"; |
| for (const TRuntimeFilterDesc& filter_desc : sink_config.filter_descs_) { |
| filter_ctxs_.emplace_back(); |
| filter_ctxs_.back().filter = state->filter_bank()->RegisterProducer(filter_desc); |
| } |
| // Ensure threads get unblocked from probe_barrier_ when the query is cancelled. Using |
| // the AddBarrierToCancel() mechanism ensures that cancellation happens after the |
| // overall error for this backend has already been set in QueryState. Otherwise this |
| // status and the original status could race with each other to become this backend's |
| // status. |
| if (probe_barrier_ != nullptr) state->AddBarrierToCancel(probe_barrier_.get()); |
| } |
| |
| PhjBuilder::PhjBuilder(const PhjBuilderConfig& sink_config, |
| BufferPool::ClientHandle* buffer_pool_client, int64_t spillable_buffer_size, |
| int64_t max_row_buffer_size, RuntimeState* state) |
| : JoinBuilder(-1, sink_config, ConstructBuilderName(sink_config.join_node_id_), state), |
| runtime_state_(state), |
| hash_seed_(sink_config.hash_seed_), |
| resource_profile_(nullptr), |
| reservation_manager_(), |
| buffer_pool_client_(buffer_pool_client), |
| spillable_buffer_size_(spillable_buffer_size), |
| max_row_buffer_size_(max_row_buffer_size), |
| build_exprs_(sink_config.build_exprs_), |
| is_not_distinct_from_(sink_config.is_not_distinct_from_), |
| filter_exprs_(sink_config.filter_exprs_), |
| hash_table_config_(*sink_config.hash_table_config_), |
| probe_barrier_(nullptr), |
| process_build_batch_fn_(sink_config.process_build_batch_fn_), |
| process_build_batch_fn_level0_(sink_config.process_build_batch_fn_level0_), |
| insert_batch_fn_(sink_config.insert_batch_fn_), |
| insert_batch_fn_level0_(sink_config.insert_batch_fn_level0_) { |
| DCHECK_GT(sink_config.hash_seed_, 0); |
| DCHECK_EQ(1, num_probe_threads_) << "Embedded builders cannot be shared"; |
| for (const TRuntimeFilterDesc& filter_desc : sink_config.filter_descs_) { |
| filter_ctxs_.emplace_back(); |
| filter_ctxs_.back().filter = state->filter_bank()->RegisterProducer(filter_desc); |
| } |
| } |
| |
| PhjBuilder::~PhjBuilder() {} |
| |
| Status PhjBuilder::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) { |
| RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker)); |
| if (is_separate_build_) { |
| const TDebugOptions& instance_debug_options = state->instance_ctx().debug_options; |
| bool debug_option_enabled = instance_debug_options.node_id == -1 |
| || instance_debug_options.node_id == join_node_id_; |
| // SET_DENY_RESERVATION_PROBABILITY should behave the same as if it were applied to |
| // the join node. |
| reservation_manager_.Init(Substitute("$0 ptr=$1", name_, this), profile(), |
| state->instance_buffer_reservation(), mem_tracker_.get(), *resource_profile_, |
| debug_option_enabled ? instance_debug_options : TDebugOptions()); |
| } |
| |
| RETURN_IF_ERROR(HashTableCtx::Create(&obj_pool_, state, hash_table_config_, hash_seed_, |
| MAX_PARTITION_DEPTH, row_desc_->tuple_descriptors().size(), expr_perm_pool_.get(), |
| expr_results_pool_.get(), expr_results_pool_.get(), &ht_ctx_)); |
| |
| DCHECK_EQ(filter_exprs_.size(), filter_ctxs_.size()); |
| for (int i = 0; i < filter_exprs_.size(); ++i) { |
| RETURN_IF_ERROR(ScalarExprEvaluator::Create(*filter_exprs_[i], state, &obj_pool_, |
| expr_perm_pool_.get(), expr_results_pool_.get(), &filter_ctxs_[i].expr_eval)); |
| } |
| |
| partitions_created_ = ADD_COUNTER(profile(), "PartitionsCreated", TUnit::UNIT); |
| largest_partition_percent_ = |
| profile()->AddHighWaterMarkCounter("LargestPartitionPercent", TUnit::UNIT); |
| max_partition_level_ = |
| profile()->AddHighWaterMarkCounter("MaxPartitionLevel", TUnit::UNIT); |
| num_build_rows_ = ADD_COUNTER(profile(), "BuildRows", TUnit::UNIT); |
| ht_stats_profile_ = HashTable::AddHashTableCounters(profile()); |
| num_spilled_partitions_ = ADD_COUNTER(profile(), "SpilledPartitions", TUnit::UNIT); |
| num_repartitions_ = ADD_COUNTER(profile(), "NumRepartitions", TUnit::UNIT); |
| partition_build_rows_timer_ = ADD_TIMER(profile(), "BuildRowsPartitionTime"); |
| build_hash_table_timer_ = ADD_TIMER(profile(), "HashTablesBuildTime"); |
| num_hash_table_builds_skipped_ = |
| ADD_COUNTER(profile(), "NumHashTableBuildsSkipped", TUnit::UNIT); |
| repartition_timer_ = ADD_TIMER(profile(), "RepartitionTime"); |
| return Status::OK(); |
| } |
| |
| Status PhjBuilder::Open(RuntimeState* state) { |
| SCOPED_TIMER(profile()->total_time_counter()); |
| RETURN_IF_ERROR(DataSink::Open(state)); |
| if (!buffer_pool_client_->is_registered()) { |
| DCHECK(is_separate_build_) << "Client is registered by PhjNode if not separate"; |
| DCHECK_GE(resource_profile_->min_reservation, MinReservation().second); |
| RETURN_IF_ERROR(reservation_manager_.ClaimBufferReservation(state)); |
| } |
| // Need to init here instead of constructor so that buffer_pool_client_ is registered. |
| if (probe_stream_reservation_.is_closed()) { |
| probe_stream_reservation_.Init(buffer_pool_client_); |
| } |
| |
| RETURN_IF_ERROR(ht_ctx_->Open(state)); |
| |
| for (const FilterContext& ctx : filter_ctxs_) { |
| RETURN_IF_ERROR(ctx.expr_eval->Open(state)); |
| } |
| if (ht_allocator_ == nullptr) { |
| // Create 'ht_allocator_' on the first call to Open(). |
| ht_allocator_.reset(new Suballocator(ExecEnv::GetInstance()->buffer_pool(), |
| buffer_pool_client_, spillable_buffer_size_)); |
| } |
| RETURN_IF_ERROR(CreateHashPartitions(0)); |
| AllocateRuntimeFilters(); |
| |
| if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { |
| RETURN_IF_ERROR(CreateAndPreparePartition(0, &null_aware_partition_)); |
| } |
| return Status::OK(); |
| } |
| |
| Status PhjBuilder::Send(RuntimeState* state, RowBatch* batch) { |
| SCOPED_TIMER(profile()->total_time_counter()); |
| SCOPED_TIMER(partition_build_rows_timer_); |
| RETURN_IF_ERROR(AddBatch(batch)); |
| COUNTER_ADD(num_build_rows_, batch->num_rows()); |
| return Status::OK(); |
| } |
| |
| Status PhjBuilder::AddBatch(RowBatch* batch) { |
| bool build_filters = ht_ctx_->level() == 0 && filter_ctxs_.size() > 0; |
| |
| PhjBuilderConfig::ProcessBuildBatchFn process_build_batch_fn; |
| if (ht_ctx_->level() == 0) { |
| process_build_batch_fn = process_build_batch_fn_level0_.load(); |
| } else { |
| process_build_batch_fn = process_build_batch_fn_.load(); |
| } |
| |
| if (process_build_batch_fn != nullptr) { |
| RETURN_IF_ERROR(process_build_batch_fn(this, batch, ht_ctx_.get(), build_filters, |
| join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN)); |
| } else { |
| RETURN_IF_ERROR(ProcessBuildBatch(batch, ht_ctx_.get(), build_filters, |
| join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN)); |
| } |
| // Free any expr result allocations made during partitioning. |
| expr_results_pool_->Clear(); |
| return Status::OK(); |
| } |
| |
| Status PhjBuilder::FlushFinal(RuntimeState* state) { |
| SCOPED_TIMER(profile()->total_time_counter()); |
| return FinalizeBuild(state); |
| } |
| |
| Status PhjBuilder::FinalizeBuild(RuntimeState* state) { |
| int64_t num_build_rows = 0; |
| for (const unique_ptr<PhjBuilderPartition>& partition : hash_partitions_) { |
| num_build_rows += partition->build_rows()->num_rows(); |
| partition->build_rows()->DoneWriting(); |
| } |
| |
| if (num_build_rows > 0) { |
| double largest_fraction = 0.0; |
| for (const unique_ptr<PhjBuilderPartition>& partition : hash_partitions_) { |
| largest_fraction = max(largest_fraction, |
| partition->build_rows()->num_rows() / static_cast<double>(num_build_rows)); |
| } |
| COUNTER_SET(largest_partition_percent_, static_cast<int64_t>(largest_fraction * 100)); |
| } |
| |
| if (VLOG_IS_ON(2)) { |
| stringstream ss; |
| ss << Substitute("PHJ(node_id=$0) partitioned(level=$1) $2 rows into:", join_node_id_, |
| hash_partitions_[0]->level(), num_build_rows); |
| for (int i = 0; i < hash_partitions_.size(); ++i) { |
| PhjBuilderPartition* partition = hash_partitions_[i].get(); |
| double percent = num_build_rows == 0 ? 0.0 : partition->build_rows()->num_rows() |
| * 100 / static_cast<double>(num_build_rows); |
| ss << " " << i << " " << (partition->is_spilled() ? "spilled" : "not spilled") |
| << " (fraction=" << fixed << setprecision(2) << percent << "%)" << endl |
| << " #rows:" << partition->build_rows()->num_rows() << endl; |
| } |
| if (null_aware_partition_ != nullptr) { |
| ss << " Null-aware partition: " << null_aware_partition_->DebugString(); |
| } |
| VLOG(2) << ss.str(); |
| } |
| |
| if (ht_ctx_->level() == 0) { |
| PublishRuntimeFilters(num_build_rows); |
| non_empty_build_ |= (num_build_rows > 0); |
| |
| if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { |
| if (null_aware_partition_->is_spilled()) { |
| // Free up memory for the hash tables of other partitions by unpinning the |
| // last block of the null aware partition's stream. |
| RETURN_IF_ERROR(null_aware_partition_->Spill(BufferedTupleStream::UNPIN_ALL)); |
| } else { |
| // Invalidate the write iterator so we can safely do concurrent reads later. |
| null_aware_partition_->build_rows()->DoneWriting(); |
| } |
| } |
| } |
| |
| HashJoinState next_state; |
| if (state_ == HashJoinState::PARTITIONING_BUILD) { |
| next_state = HashJoinState::PARTITIONING_PROBE; |
| } else { |
| DCHECK_ENUM_EQ(state_, HashJoinState::REPARTITIONING_BUILD); |
| next_state = HashJoinState::REPARTITIONING_PROBE; |
| } |
| RETURN_IF_ERROR(BuildHashTablesAndReserveProbeBuffers(next_state)); |
| UpdateState(next_state); |
| if (state_ == HashJoinState::PARTITIONING_PROBE && is_separate_build_) { |
| HandoffToProbesAndWait(state); |
| } |
| return Status::OK(); |
| } |
| |
| void PhjBuilder::Close(RuntimeState* state) { |
| if (closed_) return; |
| CloseAndDeletePartitions(nullptr); |
| if (ht_ctx_ != nullptr) { |
| ht_ctx_->StatsCountersAdd(ht_stats_profile_.get()); |
| ht_ctx_->Close(state); |
| ht_ctx_.reset(); |
| } |
| for (const FilterContext& ctx : filter_ctxs_) { |
| if (ctx.expr_eval != nullptr) ctx.expr_eval->Close(state); |
| } |
| obj_pool_.Clear(); |
| probe_stream_reservation_.Close(); |
| if (is_separate_build_) reservation_manager_.Close(state); |
| DataSink::Close(state); |
| closed_ = true; |
| } |
| |
| void PhjBuilder::Reset(RowBatch* row_batch) { |
| DCHECK(!is_separate_build_); |
| DCHECK_EQ(0, probe_stream_reservation_.GetReservation()); |
| state_ = HashJoinState::PARTITIONING_BUILD; |
| expr_results_pool_->Clear(); |
| non_empty_build_ = false; |
| next_partition_id_ = 0; |
| CloseAndDeletePartitions(row_batch); |
| } |
| |
| void PhjBuilder::UpdateState(HashJoinState next_state) { |
| // Validate the state transition. |
| switch (state_) { |
| case HashJoinState::PARTITIONING_BUILD: |
| DCHECK_ENUM_EQ(next_state, HashJoinState::PARTITIONING_PROBE); |
| break; |
| case HashJoinState::PARTITIONING_PROBE: |
| case HashJoinState::REPARTITIONING_PROBE: |
| case HashJoinState::PROBING_SPILLED_PARTITION: |
| DCHECK(next_state == HashJoinState::REPARTITIONING_BUILD |
| || next_state == HashJoinState::PROBING_SPILLED_PARTITION); |
| break; |
| case HashJoinState::REPARTITIONING_BUILD: |
| DCHECK_ENUM_EQ(next_state, HashJoinState::REPARTITIONING_PROBE); |
| break; |
| default: |
| DCHECK(false) << "Invalid state " << static_cast<int>(state_); |
| } |
| state_ = next_state; |
| VLOG(2) << "Transitioned State:" << endl << DebugString(); |
| } |
| |
| string PhjBuilder::PrintState(HashJoinState state) { |
| switch (state) { |
| case HashJoinState::PARTITIONING_BUILD: |
| return "PartitioningBuild"; |
| case HashJoinState::PARTITIONING_PROBE: |
| return "PartitioningProbe"; |
| case HashJoinState::PROBING_SPILLED_PARTITION: |
| return "ProbingSpilledPartition"; |
| case HashJoinState::REPARTITIONING_BUILD: |
| return "RepartitioningBuild"; |
| case HashJoinState::REPARTITIONING_PROBE: |
| return "RepartitioningProbe"; |
| default: |
| DCHECK(false); |
| } |
| return ""; |
| } |
| |
| Status PhjBuilder::CreateAndPreparePartition( |
| int level, unique_ptr<PhjBuilderPartition>* partition) { |
| *partition = make_unique<PhjBuilderPartition>(runtime_state_, this, level); |
| Status status = (*partition)->build_rows()->Init(name_, true); |
| if (!status.ok()) goto error; |
| bool got_buffer; |
| status = (*partition)->build_rows()->PrepareForWrite(&got_buffer); |
| if (!status.ok()) goto error; |
| DCHECK(got_buffer) |
| << "Accounted in min reservation" << buffer_pool_client_->DebugString(); |
| return Status::OK(); |
| error: |
| (*partition)->Close(nullptr); |
| partition->reset(); |
| return status; |
| } |
| |
| Status PhjBuilder::CreateHashPartitions(int level) { |
| DCHECK(hash_partitions_.empty()); |
| ht_ctx_->set_level(level); // Set the hash function for partitioning input. |
| for (int i = 0; i < PARTITION_FANOUT; ++i) { |
| unique_ptr<PhjBuilderPartition> new_partition; |
| RETURN_IF_ERROR(CreateAndPreparePartition(level, &new_partition)); |
| hash_partitions_.push_back(std::move(new_partition)); |
| } |
| COUNTER_ADD(partitions_created_, PARTITION_FANOUT); |
| COUNTER_SET(max_partition_level_, level); |
| return Status::OK(); |
| } |
| |
| bool PhjBuilder::AppendRowStreamFull( |
| BufferedTupleStream* stream, TupleRow* row, Status* status) noexcept { |
| while (true) { |
| // We ran out of memory. Pick a partition to spill. If we ran out of unspilled |
| // partitions, SpillPartition() will return an error status. |
| *status = SpillPartition(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT); |
| if (!status->ok()) return false; |
| if (stream->AddRow(row, status)) return true; |
| if (!status->ok()) return false; |
| // Spilling one partition does not guarantee we can append a row. Keep |
| // spilling until we can append this row. |
| } |
| } |
| |
| // TODO: can we do better with a different spilling heuristic? |
| Status PhjBuilder::SpillPartition(BufferedTupleStream::UnpinMode mode, |
| PhjBuilderPartition** spilled_partition) { |
| DCHECK_EQ(hash_partitions_.size(), PARTITION_FANOUT); |
| PhjBuilderPartition* best_candidate = nullptr; |
| if (null_aware_partition_ != nullptr && null_aware_partition_->CanSpill()) { |
| // Spill null-aware partition first if possible - it is always processed last. |
| best_candidate = null_aware_partition_.get(); |
| } else { |
| // Iterate over the partitions and pick the largest partition to spill. |
| int64_t max_freed_mem = 0; |
| for (const unique_ptr<PhjBuilderPartition>& candidate : hash_partitions_) { |
| if (!candidate->CanSpill()) continue; |
| int64_t mem = candidate->build_rows()->BytesPinned(false); |
| if (candidate->hash_tbl() != nullptr) { |
| // The hash table should not have matches, since we have not probed it yet. |
| // Losing match info would lead to incorrect results (IMPALA-1488). |
| DCHECK(!candidate->hash_tbl()->HasMatches()); |
| mem += candidate->hash_tbl()->ByteSize(); |
| } |
| if (mem > max_freed_mem) { |
| max_freed_mem = mem; |
| best_candidate = candidate.get(); |
| } |
| } |
| } |
| |
| if (best_candidate == nullptr) { |
| return Status(Substitute("Internal error: could not find a partition to spill in " |
| " hash join $0: \n$1\nClient:\n$2", |
| join_node_id_, DebugString(), buffer_pool_client_->DebugString())); |
| } |
| |
| VLOG(2) << "Spilling partition: " << best_candidate->DebugString() << endl |
| << DebugString(); |
| RETURN_IF_ERROR(best_candidate->Spill(mode)); |
| if (spilled_partition != nullptr) *spilled_partition = best_candidate; |
| return Status::OK(); |
| } |
| |
| // When this function is called, we've finished processing the current build input |
| // (either from the child ExecNode or from repartitioning a spilled partition). The build |
| // rows have only been partitioned, we still need to build hash tables over them. Some |
| // of the partitions could have already been spilled and attempting to build hash |
| // tables over the non-spilled ones can cause them to spill. |
| // |
| // At the end of the function all partitions either have a hash table (and therefore are |
| // not spilled) or are spilled. Partitions that have hash tables don't need to spill on |
| // the probe side. |
| // |
| // This maps perfectly to a 0-1 knapsack where the weight is the memory to keep the |
| // build rows and hash table and the value is the expected IO savings. |
| // For now, we go with a greedy solution. |
| // |
| // TODO: implement the knapsack solution. |
| Status PhjBuilder::BuildHashTablesAndReserveProbeBuffers(HashJoinState next_state) { |
| DCHECK_EQ(PARTITION_FANOUT, hash_partitions_.size()); |
| |
| for (int i = 0; i < PARTITION_FANOUT; ++i) { |
| PhjBuilderPartition* partition = hash_partitions_[i].get(); |
| if (partition->build_rows()->num_rows() == 0) { |
| // This partition is empty, no need to do anything else. |
| partition->Close(nullptr); |
| } else if (partition->is_spilled()) { |
| // We don't need any build-side data for spilled partitions in memory. |
| RETURN_IF_ERROR( |
| partition->build_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL)); |
| } |
| } |
| |
| // TODO: the below logic could be improved calculating upfront how much memory is needed |
| // for each hash table, and only building hash tables that will eventually fit in |
| // memory. In some cases now we could build a hash table, then spill the partition |
| // later. |
| |
| // Allocate probe buffers for all partitions that are already spilled. Do this before |
| // building hash tables because allocating probe buffers may cause some more partitions |
| // to be spilled. This avoids wasted work on building hash tables for partitions that |
| // won't fit in memory alongside the required probe buffers. |
| RETURN_IF_ERROR(ReserveProbeBuffers(next_state)); |
| |
| for (int i = 0; i < PARTITION_FANOUT; ++i) { |
| PhjBuilderPartition* partition = hash_partitions_[i].get(); |
| if (partition->IsClosed() || partition->is_spilled()) continue; |
| |
| bool built = false; |
| DCHECK(partition->build_rows()->is_pinned()); |
| RETURN_IF_ERROR(partition->BuildHashTable(&built)); |
| // If we did not have enough memory to build this hash table, we need to spill this |
| // partition (clean up the hash table, unpin build). |
| if (!built) RETURN_IF_ERROR(partition->Spill(BufferedTupleStream::UNPIN_ALL)); |
| } |
| // We may have spilled additional partitions while building hash tables, we need to |
| // reserve memory for the probe buffers for those additional spilled partitions. |
| RETURN_IF_ERROR(ReserveProbeBuffers(next_state)); |
| if (is_separate_build_) { |
| // The builder may have some surplus reservation. Release it so that it can be |
| // used by the probe side or by other operators. |
| RETURN_IF_ERROR(reservation_manager_.ReleaseUnusedReservation()); |
| } |
| return Status::OK(); |
| } |
| |
| Status PhjBuilder::ReserveProbeBuffers(HashJoinState next_state) { |
| DCHECK_EQ(PARTITION_FANOUT, hash_partitions_.size()); |
| int64_t curr_reservation = probe_stream_reservation_.GetReservation(); |
| int64_t addtl_reservation = |
| CalcProbeStreamReservation(next_state) * num_probe_threads_ - curr_reservation; |
| |
| // Loop until either we get enough reservation or all partitions are spilled (in which |
| // case SpillPartition() returns an error). |
| while (addtl_reservation > buffer_pool_client_->GetUnusedReservation()) { |
| PhjBuilderPartition* spilled_partition; |
| RETURN_IF_ERROR(SpillPartition( |
| BufferedTupleStream::UNPIN_ALL, &spilled_partition)); |
| // Increase reservation to reflect the additional spilled partition. |
| // Don't need to create a probe stream for the null-aware partition. |
| if (spilled_partition != null_aware_partition_.get()) { |
| addtl_reservation += spillable_buffer_size_ * num_probe_threads_; |
| } |
| } |
| VLOG(3) << "PHJ(node_id=" << join_node_id_ << ") saved reservation " |
| << addtl_reservation; |
| buffer_pool_client_->SaveReservation(&probe_stream_reservation_, addtl_reservation); |
| return Status::OK(); |
| } |
| |
| Status PhjBuilder::BeginInitialProbe( |
| BufferPool::ClientHandle* probe_client, HashPartitions* partitions) { |
| DCHECK_EQ(is_separate_build_, probe_client != buffer_pool_client_); |
| DCHECK_ENUM_EQ(state_, HashJoinState::PARTITIONING_PROBE); |
| DCHECK_EQ(PARTITION_FANOUT, hash_partitions_.size()); |
| RETURN_IF_ERROR(TransferProbeStreamReservation(probe_client)); |
| *partitions = HashPartitions(ht_ctx_->level(), &hash_partitions_, non_empty_build_); |
| return Status::OK(); |
| } |
| |
| Status PhjBuilder::TransferProbeStreamReservation( |
| BufferPool::ClientHandle* probe_client) { |
| DCHECK_EQ(is_separate_build_, probe_client != buffer_pool_client_); |
| int64_t probe_reservation = CalcProbeStreamReservation(state_); |
| VLOG(3) << "PHJ(node_id=" << join_node_id_ << ") will transfer " << probe_reservation |
| << " to probe client."; |
| if (probe_reservation == 0) return Status::OK(); |
| DCHECK_GE(probe_stream_reservation_.GetReservation(), probe_reservation); |
| |
| buffer_pool_client_->RestoreReservation(&probe_stream_reservation_, probe_reservation); |
| if (is_separate_build_) { |
| bool success; |
| RETURN_IF_ERROR(buffer_pool_client_->TransferReservationTo( |
| probe_client, probe_reservation, &success)); |
| DCHECK(success) << "Transferring within query shouldn't violate reservation limits."; |
| } |
| return Status::OK(); |
| } |
| |
| int64_t PhjBuilder::CalcProbeStreamReservation(HashJoinState next_state) const { |
| // We need a read buffer if the input is a spilled partition (i.e. we are repartitioning |
| // the input). |
| bool need_probe_buffer; |
| if (next_state == HashJoinState::PARTITIONING_PROBE) { |
| need_probe_buffer = false; |
| } else { |
| DCHECK(next_state == HashJoinState::PROBING_SPILLED_PARTITION |
| || next_state == HashJoinState::REPARTITIONING_PROBE) |
| << static_cast<int>(next_state); |
| DCHECK_GT(spilled_partitions_.size(), 0); |
| need_probe_buffer = spilled_partitions_.back()->num_spilled_probe_rows() > 0; |
| } |
| DCHECK(next_state == HashJoinState::PROBING_SPILLED_PARTITION |
| || hash_partitions_.size() > 0); |
| int num_spilled_partitions = GetNumSpilledPartitions(hash_partitions_); |
| int num_buffers = num_spilled_partitions + (need_probe_buffer ? 1 : 0); |
| int num_max_sized_buffers = |
| (num_spilled_partitions > 0 ? 1 : 0) + (need_probe_buffer ? 1 : 0); |
| DCHECK_LE(num_max_sized_buffers, num_buffers); |
| return num_max_sized_buffers * max_row_buffer_size_ + |
| (num_buffers - num_max_sized_buffers) * spillable_buffer_size_; |
| } |
| |
| int PhjBuilder::GetNumSpilledPartitions( |
| const vector<unique_ptr<PhjBuilderPartition>>& partitions) { |
| int num_spilled = 0; |
| for (int i = 0; i < partitions.size(); ++i) { |
| PhjBuilderPartition* partition = partitions[i].get(); |
| DCHECK(partition != nullptr); |
| if (!partition->IsClosed() && partition->is_spilled()) ++num_spilled; |
| } |
| return num_spilled; |
| } |
| |
| Status PhjBuilder::DoneProbingHashPartitions( |
| const int64_t num_spilled_probe_rows[PARTITION_FANOUT], |
| BufferPool::ClientHandle* probe_client, RuntimeProfile* probe_profile, |
| deque<unique_ptr<PhjBuilderPartition>>* output_partitions, RowBatch* batch) { |
| DCHECK_EQ(is_separate_build_, probe_client != buffer_pool_client_); |
| DCHECK(output_partitions->empty()); |
| VLOG(3) << "PHJ(node_id=" << join_node_id_ << ") done probing hash partitions."; |
| // Calculate the reservation before cleaning up 'hash_partitions_' and |
| // 'spilled_partitions_', which the calculation depends on. |
| int64_t probe_reservation = CalcProbeStreamReservation(state_); |
| DCHECK_GE(probe_client->GetUnusedReservation(), probe_reservation); |
| |
| // Merge together num_spilled_probe_rows to include info from all threads. |
| for (int i = 0; i < PARTITION_FANOUT; ++i) { |
| PhjBuilderPartition* partition = hash_partitions_[i].get(); |
| if (partition->IsClosed()) continue; |
| partition->IncrementNumSpilledProbeRows(num_spilled_probe_rows[i]); |
| } |
| |
| if (num_probe_threads_ > 1) { |
| SCOPED_TIMER(probe_profile->inactive_timer()); |
| // TODO: IMPALA-9411: consider reworking to attach buffers to all output batches. |
| RETURN_IF_ERROR(probe_barrier_->Wait([&]() { |
| CleanUpHashPartitions(output_partitions, nullptr); |
| DCHECK_EQ(0, output_partitions->size()) |
| << "Cannot share build for join modes that return rows from build partitions"; |
| return Status::OK(); |
| })); |
| } else if (is_separate_build_) { |
| SCOPED_TIMER(probe_profile->inactive_timer()); |
| CleanUpHashPartitions(output_partitions, batch); |
| } else { |
| // No need to activate probe's inactive timer, since the builder will be a child of |
| // the probe and its time will be subtracted from probe's total time. |
| CleanUpHashPartitions(output_partitions, batch); |
| } |
| |
| if (is_separate_build_) { |
| bool success; |
| RETURN_IF_ERROR(probe_client->TransferReservationTo( |
| buffer_pool_client_, probe_reservation, &success)); |
| DCHECK(success) << "Transferring within query shouldn't violate reservation limits."; |
| } |
| return Status::OK(); |
| } |
| |
| void PhjBuilder::CleanUpHashPartitions( |
| deque<unique_ptr<PhjBuilderPartition>>* output_partitions, RowBatch* batch) { |
| SCOPED_TIMER(profile()->total_time_counter()); |
| if (state_ == HashJoinState::REPARTITIONING_PROBE) { |
| // Finished repartitioning this partition. Discard before pushing more spilled |
| // partitions onto 'spilled_partitions_'. |
| DCHECK(!spilled_partitions_.empty()); |
| spilled_partitions_.pop_back(); |
| } |
| |
| for (int i = 0; i < PARTITION_FANOUT; ++i) { |
| unique_ptr<PhjBuilderPartition> partition = std::move(hash_partitions_[i]); |
| if (partition->IsClosed()) continue; |
| if (partition->is_spilled()) { |
| DCHECK(partition->hash_tbl() == nullptr) << DebugString(); |
| DCHECK_EQ(partition->build_rows()->BytesPinned(false), 0) |
| << "Build was fully unpinned in BuildHashTablesAndPrepareProbeStreams()"; |
| if (partition->num_spilled_probe_rows() == 0 |
| && !NeedToProcessUnmatchedBuildRows(join_op_) |
| && num_probe_threads_ == 1) { |
| COUNTER_ADD(num_hash_table_builds_skipped_, 1); |
| partition->Close(nullptr); |
| } else { |
| // For shared builds, always add the partition to keep the spilled partitions |
| // in sync across all the builders and join nodes. |
| spilled_partitions_.push_back(std::move(partition)); |
| } |
| } else if (NeedToProcessUnmatchedBuildRows(join_op_)) { |
| output_partitions->push_back(std::move(partition)); |
| } else { |
| // No more processing is required for this partition. |
| partition->Close(batch); |
| } |
| } |
| hash_partitions_.clear(); |
| } |
| |
| Status PhjBuilder::DoneProbingSinglePartition(BufferPool::ClientHandle* probe_client, |
| RuntimeProfile* probe_profile, |
| deque<unique_ptr<PhjBuilderPartition>>* output_partitions, RowBatch* batch) { |
| VLOG(3) << "PHJ(node_id=" << join_node_id_ << ") done probing single partition."; |
| DCHECK_EQ(is_separate_build_, probe_client != buffer_pool_client_); |
| // Calculate before popping off the last 'spilled_partition_'. |
| int64_t probe_reservation = CalcProbeStreamReservation(state_); |
| DCHECK_GE(probe_client->GetUnusedReservation(), probe_reservation); |
| if (num_probe_threads_ > 1) { |
| SCOPED_TIMER(probe_profile->inactive_timer()); |
| // TODO: IMPALA-9411: consider reworking to attach buffers to all output batches. |
| RETURN_IF_ERROR(probe_barrier_->Wait([&]() { |
| CleanUpSinglePartition(output_partitions, nullptr); |
| DCHECK_EQ(0, output_partitions->size()) |
| << "Cannot share build for join modes that return rows from build partitions"; |
| return Status::OK(); |
| })); |
| } else if (is_separate_build_) { |
| SCOPED_TIMER(probe_profile->inactive_timer()); |
| CleanUpSinglePartition(output_partitions, batch); |
| } else { |
| // No need to activate probe's inactive timer, since the builder will be a child of |
| // the probe and its time will be subtracted from probe's total time. |
| CleanUpSinglePartition(output_partitions, batch); |
| } |
| if (is_separate_build_) { |
| bool success; |
| RETURN_IF_ERROR(probe_client->TransferReservationTo( |
| buffer_pool_client_, probe_reservation, &success)); |
| DCHECK(success) << "Transferring within query shouldn't violate reservation limits."; |
| } |
| return Status::OK(); |
| } |
| |
| void PhjBuilder::CleanUpSinglePartition( |
| deque<unique_ptr<PhjBuilderPartition>>* output_partitions, RowBatch* batch) { |
| SCOPED_TIMER(profile()->total_time_counter()); |
| if (NeedToProcessUnmatchedBuildRows(join_op_)) { |
| DCHECK_LE(num_probe_threads_, 1) |
| << "Don't support returning build partitions with shared build"; |
| // If the build partition was in memory, we are done probing this partition. |
| // In case of right-outer, right-anti and full-outer joins, we move this partition |
| // to the list of partitions that we need to output their unmatched build rows. |
| output_partitions->push_back(std::move(spilled_partitions_.back())); |
| } else { |
| // In any other case, just close the input build partition. |
| spilled_partitions_.back()->Close(IsLeftSemiJoin(join_op_) ? nullptr : batch); |
| } |
| spilled_partitions_.pop_back(); |
| } |
| |
| Status PhjBuilder::BeginNullAwareProbe() { |
| DCHECK(null_aware_partition_ != nullptr); |
| if (num_probe_threads_ > 1) { |
| return probe_barrier_->Wait([&]() { |
| return BeginNullAwareProbeSerial(); |
| }); |
| } else { |
| return BeginNullAwareProbeSerial(); |
| } |
| } |
| |
| Status PhjBuilder::BeginNullAwareProbeSerial() { |
| BufferedTupleStream* build_rows = null_aware_partition_->build_rows(); |
| bool pinned; |
| RETURN_IF_ERROR(build_rows->PinStream(&pinned)); |
| if (!pinned) { |
| return Status(TErrorCode::NAAJ_OUT_OF_MEMORY, build_rows->num_rows(), |
| PrettyPrinter::PrintBytes(build_rows->byte_size()), |
| PrettyPrinter::PrintBytes( |
| buffer_pool_client_->GetUnusedReservation() + build_rows->BytesPinned(false)), |
| PrettyPrinter::PrintBytes(buffer_pool_client_->GetReservation())); |
| } |
| return Status::OK(); |
| } |
| |
| Status PhjBuilder::DoneProbingNullAwarePartition() { |
| DCHECK(null_aware_partition_ != nullptr); |
| if (num_probe_threads_ > 1) { |
| RETURN_IF_ERROR(probe_barrier_->Wait([&]() { |
| CloseNullAwarePartition(); |
| return Status::OK(); |
| })); |
| } else { |
| CloseNullAwarePartition(); |
| } |
| return Status::OK(); |
| } |
| |
| void PhjBuilder::CloseNullAwarePartition() { |
| if (null_aware_partition_ == nullptr) return; |
| // We don't need to pass in a batch because the anti-join only returns tuple data |
| // from the probe side - i.e. the RowDescriptor for PartitionedHashJoinNode does |
| // not include the build tuple. |
| null_aware_partition_->Close(nullptr); |
| null_aware_partition_.reset(); |
| } |
| |
| void PhjBuilder::CloseAndDeletePartitions(RowBatch* row_batch) { |
| // Close all the partitions and clean up all references to them. |
| for (unique_ptr<PhjBuilderPartition>& partition : hash_partitions_) { |
| partition->Close(row_batch); |
| } |
| hash_partitions_.clear(); |
| for (unique_ptr<PhjBuilderPartition>& partition : spilled_partitions_) { |
| partition->Close(row_batch); |
| } |
| spilled_partitions_.clear(); |
| CloseNullAwarePartition(); |
| } |
| |
| void PhjBuilder::AllocateRuntimeFilters() { |
| DCHECK(join_op_ != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || filter_ctxs_.size() == 0) |
| << "Runtime filters not supported with NULL_AWARE_LEFT_ANTI_JOIN"; |
| DCHECK(ht_ctx_ != nullptr); |
| for (int i = 0; i < filter_ctxs_.size(); ++i) { |
| if (filter_ctxs_[i].filter->is_bloom_filter()) { |
| filter_ctxs_[i].local_bloom_filter = |
| runtime_state_->filter_bank()->AllocateScratchBloomFilter( |
| filter_ctxs_[i].filter->id()); |
| } else { |
| DCHECK(filter_ctxs_[i].filter->is_min_max_filter()); |
| filter_ctxs_[i].local_min_max_filter = |
| runtime_state_->filter_bank()->AllocateScratchMinMaxFilter( |
| filter_ctxs_[i].filter->id(), filter_ctxs_[i].expr_eval->root().type()); |
| } |
| } |
| } |
| |
| void PhjBuilder::InsertRuntimeFilters( |
| FilterContext filter_ctxs[], TupleRow* build_row) noexcept { |
| // For the interpreted path we can directly use the filter_ctxs_ member variable. |
| DCHECK_EQ(filter_ctxs_.data(), filter_ctxs); |
| for (const FilterContext& ctx : filter_ctxs_) ctx.Insert(build_row); |
| } |
| |
| void PhjBuilder::PublishRuntimeFilters(int64_t num_build_rows) { |
| VLOG(3) << "Join builder (join_node_id_=" << join_node_id_ << ") publishing " |
| << filter_ctxs_.size() << " filters."; |
| int32_t num_enabled_filters = 0; |
| for (const FilterContext& ctx : filter_ctxs_) { |
| BloomFilter* bloom_filter = nullptr; |
| if (ctx.local_bloom_filter != nullptr) { |
| bloom_filter = ctx.local_bloom_filter; |
| ++num_enabled_filters; |
| } else if (ctx.local_min_max_filter != nullptr |
| && !ctx.local_min_max_filter->AlwaysTrue()) { |
| ++num_enabled_filters; |
| } |
| |
| runtime_state_->filter_bank()->UpdateFilterFromLocal( |
| ctx.filter->id(), bloom_filter, ctx.local_min_max_filter); |
| } |
| |
| if (filter_ctxs_.size() > 0) { |
| string info_string; |
| if (num_enabled_filters == filter_ctxs_.size()) { |
| info_string = Substitute("$0 of $0 Runtime Filter$1 Published", filter_ctxs_.size(), |
| filter_ctxs_.size() == 1 ? "" : "s"); |
| } else { |
| info_string = Substitute("$0 of $1 Runtime Filter$2 Published, $3 Disabled", |
| num_enabled_filters, filter_ctxs_.size(), filter_ctxs_.size() == 1 ? "" : "s", |
| filter_ctxs_.size() - num_enabled_filters); |
| } |
| profile()->AddInfoString("Runtime filters", info_string); |
| } |
| } |
| |
| Status PhjBuilder::BeginSpilledProbe(BufferPool::ClientHandle* probe_client, |
| RuntimeProfile* probe_profile, bool* repartitioned, |
| PhjBuilderPartition** input_partition, HashPartitions* new_partitions) { |
| DCHECK_EQ(is_separate_build_, probe_client != buffer_pool_client_); |
| DCHECK(!spilled_partitions_.empty()); |
| DCHECK_EQ(0, hash_partitions_.size()); |
| |
| if (num_probe_threads_ > 1) { |
| SCOPED_TIMER(probe_profile->inactive_timer()); |
| RETURN_IF_ERROR(probe_barrier_->Wait([&]() { return BeginSpilledProbeSerial(); })); |
| } else if (is_separate_build_) { |
| SCOPED_TIMER(probe_profile->inactive_timer()); |
| RETURN_IF_ERROR(BeginSpilledProbeSerial()); |
| } else { |
| // No need to activate probe's inactive timer, since the builder will be a child of |
| // the probe and its time will be subtracted from probe's total time. |
| RETURN_IF_ERROR(BeginSpilledProbeSerial()); |
| } |
| |
| RETURN_IF_ERROR(TransferProbeStreamReservation(probe_client)); |
| *input_partition = spilled_partitions_.back().get(); |
| if (state_ == HashJoinState::PROBING_SPILLED_PARTITION) { |
| *repartitioned = false; |
| } else { |
| DCHECK_ENUM_EQ(HashJoinState::REPARTITIONING_PROBE, state_); |
| *repartitioned = true; |
| *new_partitions = |
| HashPartitions(ht_ctx_->level(), &hash_partitions_, non_empty_build_); |
| } |
| return Status::OK(); |
| } |
| |
| Status PhjBuilder::BeginSpilledProbeSerial() { |
| SCOPED_TIMER(profile()->total_time_counter()); |
| DCHECK_EQ(0, probe_stream_reservation_.GetReservation()); |
| if (is_separate_build_ || join_op_ != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { |
| DCHECK_EQ(0, buffer_pool_client_->GetUsedReservation()) |
| << "All memory should be available to bring the spilled partition into memory: " |
| << "all build and probe data shuld be spilled to disk. THe only exception is " |
| << "NAAJ probe streams, which are accounted for in the PHJ node separately."; |
| } |
| |
| // Pick the next spilled partition to process. The partition will stay in |
| // 'spilled_partitions_' until we are done probing it or repartitioning its probe. |
| // Thus it will remain valid as long as it's needed and always get cleaned up in |
| // Close(), even if an error occurs. |
| PhjBuilderPartition* partition = spilled_partitions_.back().get(); |
| DCHECK(partition->is_spilled()) << partition->DebugString(); |
| |
| if (partition->num_spilled_probe_rows() == 0 && num_probe_threads_ == 1) { |
| // If there are no probe rows, there's no need to build the hash table, and |
| // only partitions with NeedToProcessUnmatchedBuildRows() will have been added |
| // to 'spilled_partitions_' in DoneProbingHashPartitions(). |
| DCHECK(NeedToProcessUnmatchedBuildRows(join_op_)); |
| bool got_read_buffer = false; |
| RETURN_IF_ERROR(partition->build_rows()->PrepareForRead(true, &got_read_buffer)); |
| if (!got_read_buffer) { |
| return mem_tracker()->MemLimitExceeded( |
| runtime_state_, Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, join_node_id_)); |
| } |
| COUNTER_ADD(num_hash_table_builds_skipped_, 1); |
| UpdateState(HashJoinState::PROBING_SPILLED_PARTITION); |
| return Status::OK(); |
| } |
| |
| // Set aside memory required for reading the probe stream, so that we don't use |
| // it for the hash table. |
| bool need_probe_buffer = partition->num_spilled_probe_rows() > 0; |
| int64_t saved_probe_reservation = |
| need_probe_buffer ? max_row_buffer_size_ * num_probe_threads_ : 0; |
| buffer_pool_client_->SaveReservation( |
| &probe_stream_reservation_, saved_probe_reservation); |
| VLOG(3) << "PHJ(node_id=" << join_node_id_ << ") saved " << saved_probe_reservation |
| << " for probe clients."; |
| |
| // Try to build a hash table for the spilled build partition. |
| bool built; |
| RETURN_IF_ERROR(partition->BuildHashTable(&built)); |
| if (built) { |
| UpdateState(HashJoinState::PROBING_SPILLED_PARTITION); |
| return Status::OK(); |
| } |
| // This build partition still does not fit in memory, repartition. |
| UpdateState(HashJoinState::REPARTITIONING_BUILD); |
| |
| int next_partition_level = partition->level() + 1; |
| if (UNLIKELY(next_partition_level >= MAX_PARTITION_DEPTH)) { |
| return Status(TErrorCode::PARTITIONED_HASH_JOIN_MAX_PARTITION_DEPTH, join_node_id_, |
| MAX_PARTITION_DEPTH); |
| } |
| |
| // Spill to free memory from hash tables and pinned streams for use in new partitions. |
| RETURN_IF_ERROR(partition->Spill(BufferedTupleStream::UNPIN_ALL)); |
| // Temporarily free up the probe reservation to use when repartitioning. Repartitioning |
| // will reserve as much memory as needed for the probe streams. |
| buffer_pool_client_->RestoreReservation( |
| &probe_stream_reservation_, saved_probe_reservation); |
| VLOG(3) << "PHJ(node_id=" << join_node_id_ << ") unsave " << saved_probe_reservation |
| << " for probe clients."; |
| // All reservation should be available for repartitioning. |
| DCHECK_EQ(0, probe_stream_reservation_.GetReservation()); |
| DCHECK(buffer_pool_client_->GetUsedReservation() == 0 |
| || (!is_separate_build_ && join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN)) |
| << "Only NAAJ probe streams should be consuming memory" << DebugString(); |
| |
| DCHECK_EQ(partition->build_rows()->BytesPinned(false), 0) << DebugString(); |
| int64_t num_input_rows = partition->build_rows()->num_rows(); |
| RETURN_IF_ERROR(RepartitionBuildInput(partition)); |
| |
| // Check if there was any reduction in the size of partitions after repartitioning. |
| int64_t largest_partition_rows = LargestPartitionRows(); |
| DCHECK_GE(num_input_rows, largest_partition_rows) << "Cannot have a partition with " |
| "more rows than the input"; |
| if (UNLIKELY(num_input_rows == largest_partition_rows)) { |
| return Status(TErrorCode::PARTITIONED_HASH_JOIN_REPARTITION_FAILS, join_node_id_, |
| next_partition_level, num_input_rows, DebugString(), |
| buffer_pool_client_->DebugString()); |
| } |
| return Status::OK(); |
| } |
| |
| Status PhjBuilder::RepartitionBuildInput(PhjBuilderPartition* input_partition) { |
| int new_level = input_partition->level() + 1; |
| DCHECK_GE(new_level, 1); |
| SCOPED_TIMER(repartition_timer_); |
| COUNTER_ADD(num_repartitions_, 1); |
| RuntimeState* state = runtime_state_; |
| |
| // Setup the read buffer and the new partitions. |
| BufferedTupleStream* build_rows = input_partition->build_rows(); |
| DCHECK(build_rows != nullptr); |
| bool got_read_buffer; |
| RETURN_IF_ERROR(build_rows->PrepareForRead(true, &got_read_buffer)); |
| if (!got_read_buffer) { |
| return mem_tracker()->MemLimitExceeded( |
| state, Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, join_node_id_)); |
| } |
| RETURN_IF_ERROR(CreateHashPartitions(new_level)); |
| |
| // Repartition 'input_stream' into 'hash_partitions_'. |
| RowBatch build_batch(row_desc_, state->batch_size(), mem_tracker()); |
| bool eos = false; |
| while (!eos) { |
| RETURN_IF_CANCELLED(state); |
| RETURN_IF_ERROR(state->CheckQueryState()); |
| |
| RETURN_IF_ERROR(build_rows->GetNext(&build_batch, &eos)); |
| RETURN_IF_ERROR(AddBatch(&build_batch)); |
| build_batch.Reset(); |
| } |
| |
| // Done reading the input, we can safely close it now to free memory. |
| input_partition->Close(nullptr); |
| RETURN_IF_ERROR(FinalizeBuild(state)); |
| return Status::OK(); |
| } |
| |
| int64_t PhjBuilder::LargestPartitionRows() const { |
| int64_t max_rows = 0; |
| for (int i = 0; i < hash_partitions_.size(); ++i) { |
| PhjBuilderPartition* partition = hash_partitions_[i].get(); |
| DCHECK(partition != nullptr); |
| if (partition->IsClosed()) continue; |
| int64_t rows = partition->build_rows()->num_rows(); |
| if (rows > max_rows) max_rows = rows; |
| } |
| return max_rows; |
| } |
| |
| bool PhjBuilder::HashTableStoresNulls( |
| TJoinOp::type join_op, const vector<bool>& is_not_distinct_from) { |
| return join_op == TJoinOp::RIGHT_OUTER_JOIN || join_op == TJoinOp::RIGHT_ANTI_JOIN |
| || join_op == TJoinOp::FULL_OUTER_JOIN |
| || std::accumulate(is_not_distinct_from.begin(), is_not_distinct_from.end(), false, |
| std::logical_or<bool>()); |
| } |
| |
| void PhjBuilder::ReturnReservation( |
| BufferPool::ClientHandle* probe_client, int64_t bytes) { |
| DCHECK(is_separate_build_); |
| DCHECK(buffer_pool_client_ != probe_client); |
| bool success; |
| Status status = |
| probe_client->TransferReservationTo(buffer_pool_client_, bytes, &success); |
| DCHECK(status.ok()) << status.GetDetail() << " shouldn't have any dirty pages to flush"; |
| DCHECK(success) << "Transferring within query shouldn't violate reservation limits."; |
| } |
| |
| PhjBuilderPartition::PhjBuilderPartition(RuntimeState* state, PhjBuilder* parent, |
| int level) |
| : parent_(parent), |
| id_(parent->next_partition_id_++), |
| is_spilled_(false), |
| level_(level) { |
| build_rows_ = make_unique<BufferedTupleStream>(state, parent_->row_desc_, |
| parent_->buffer_pool_client_, parent->spillable_buffer_size_, |
| parent->max_row_buffer_size_); |
| } |
| |
| PhjBuilderPartition::~PhjBuilderPartition() { |
| DCHECK(IsClosed()); |
| } |
| |
| int64_t PhjBuilderPartition::EstimatedInMemSize() const { |
| return build_rows_->byte_size() + HashTable::EstimateSize(build_rows_->num_rows()); |
| } |
| |
| void PhjBuilderPartition::Close(RowBatch* batch) { |
| if (IsClosed()) return; |
| |
| if (hash_tbl_ != nullptr) { |
| hash_tbl_->StatsCountersAdd(parent_->ht_stats_profile_.get()); |
| hash_tbl_->Close(); |
| } |
| |
| // Transfer ownership of 'build_rows_' memory to 'batch' if 'batch' is not NULL. |
| // Flush out the resources to free up memory for subsequent partitions. |
| if (build_rows_ != nullptr) { |
| build_rows_->Close(batch, RowBatch::FlushMode::FLUSH_RESOURCES); |
| build_rows_.reset(); |
| } |
| } |
| |
| Status PhjBuilderPartition::Spill(BufferedTupleStream::UnpinMode mode) { |
| DCHECK(!IsClosed()); |
| RETURN_IF_ERROR(parent_->runtime_state_->StartSpilling(parent_->mem_tracker())); |
| // Close the hash table and unpin the stream backing it to free memory. |
| if (hash_tbl() != nullptr) { |
| hash_tbl_->Close(); |
| hash_tbl_.reset(); |
| } |
| RETURN_IF_ERROR(build_rows_->UnpinStream(mode)); |
| if (!is_spilled_) { |
| COUNTER_ADD(parent_->num_spilled_partitions_, 1); |
| if (parent_->num_spilled_partitions_->value() == 1) { |
| parent_->profile()->AppendExecOption("Spilled"); |
| } |
| } |
| is_spilled_ = true; |
| return Status::OK(); |
| } |
| |
| Status PhjBuilderPartition::BuildHashTable(bool* built) { |
| SCOPED_TIMER(parent_->build_hash_table_timer_); |
| DCHECK(build_rows_ != nullptr); |
| *built = false; |
| |
| // Before building the hash table, we need to pin the rows in memory. |
| RETURN_IF_ERROR(build_rows_->PinStream(built)); |
| if (!*built) return Status::OK(); |
| |
| RuntimeState* state = parent_->runtime_state_; |
| HashTableCtx* ctx = parent_->ht_ctx_.get(); |
| ctx->set_level(level()); // Set the hash function for building the hash table. |
| RowBatch batch(parent_->row_desc_, state->batch_size(), parent_->mem_tracker()); |
| vector<BufferedTupleStream::FlatRowPtr> flat_rows; |
| bool eos = false; |
| |
| // Allocate the partition-local hash table. Initialize the number of buckets based on |
| // the number of build rows (the number of rows is known at this point). This assumes |
| // there are no duplicates which can be wrong. However, the upside in the common case |
| // (few/no duplicates) is large and the downside when there are is low (a bit more |
| // memory; the bucket memory is small compared to the memory needed for all the build |
| // side allocations). |
| // One corner case is if the stream contains tuples with zero footprint (no materialized |
| // slots). If the tuples occupy no space, this implies all rows will be duplicates, so |
| // create a small hash table, IMPALA-2256. |
| // |
| // TODO: Try to allocate the hash table before pinning the stream to avoid needlessly |
| // reading all of the spilled rows from disk when we won't succeed anyway. |
| int64_t estimated_num_buckets = HashTable::EstimateNumBuckets(build_rows()->num_rows()); |
| hash_tbl_.reset(HashTable::Create(parent_->ht_allocator_.get(), |
| true /* store_duplicates */, parent_->row_desc_->tuple_descriptors().size(), |
| build_rows(), 1 << (32 - PhjBuilder::NUM_PARTITIONING_BITS), |
| estimated_num_buckets)); |
| bool success; |
| Status status = hash_tbl_->Init(&success); |
| if (!status.ok() || !success) goto not_built; |
| status = build_rows_->PrepareForRead(false, &success); |
| if (!status.ok()) goto not_built; |
| DCHECK(success) << "Stream was already pinned."; |
| |
| do { |
| status = build_rows_->GetNext(&batch, &eos, &flat_rows); |
| if (!status.ok()) goto not_built; |
| DCHECK_EQ(batch.num_rows(), flat_rows.size()); |
| DCHECK_LE(batch.num_rows(), hash_tbl_->EmptyBuckets()); |
| TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode; |
| |
| InsertBatchFn insert_batch_fn; |
| if (level() == 0) { |
| insert_batch_fn = parent_->insert_batch_fn_level0_.load(); |
| } else { |
| insert_batch_fn = parent_->insert_batch_fn_.load(); |
| } |
| |
| if (insert_batch_fn != nullptr) { |
| if (UNLIKELY( |
| !insert_batch_fn(this, prefetch_mode, ctx, &batch, flat_rows, &status))) { |
| goto not_built; |
| } |
| } else if (UNLIKELY(!InsertBatch(prefetch_mode, ctx, &batch, flat_rows, &status))) { |
| goto not_built; |
| } |
| |
| RETURN_IF_CANCELLED(state); |
| RETURN_IF_ERROR(state->GetQueryStatus()); |
| // Free any expr result allocations made while inserting. |
| parent_->expr_results_pool_->Clear(); |
| batch.Reset(); |
| } while (!eos); |
| |
| // The hash table fits in memory and is built. |
| DCHECK(*built); |
| DCHECK(hash_tbl_ != nullptr); |
| is_spilled_ = false; |
| COUNTER_ADD(parent_->ht_stats_profile_->num_hash_buckets_, |
| hash_tbl_->num_buckets()); |
| return Status::OK(); |
| |
| not_built: |
| *built = false; |
| if (hash_tbl_ != nullptr) { |
| hash_tbl_->Close(); |
| hash_tbl_.reset(); |
| } |
| return status; |
| } |
| |
| std::string PhjBuilderPartition::DebugString() { |
| stringstream ss; |
| ss << "<Partition>: ptr=" << this << " id=" << id_; |
| if (IsClosed()) { |
| ss << " Closed"; |
| return ss.str(); |
| } |
| if (is_spilled()) { |
| ss << " Spilled"; |
| } |
| DCHECK(build_rows() != nullptr); |
| ss << endl |
| << " Build Rows: " << build_rows_->num_rows() |
| << " (Bytes pinned: " << build_rows_->BytesPinned(false) << ")" |
| << endl; |
| if (hash_tbl_ != nullptr) { |
| ss << " Hash Table Rows: " << hash_tbl_->size(); |
| } |
| ss << " Spilled Probe Rows: " << num_spilled_probe_rows_.Load() << endl; |
| return ss.str(); |
| } |
| |
| void PhjBuilderConfig::Codegen(FragmentState* state) { |
| LlvmCodeGen* codegen = state->codegen(); |
| DCHECK(codegen != nullptr); |
| Status build_codegen_status; |
| Status insert_codegen_status; |
| Status codegen_status; |
| |
| // Codegen for hashing rows with the builder's hash table context. |
| llvm::Function* hash_fn; |
| codegen_status = |
| HashTableCtx::CodegenHashRow(codegen, false, *hash_table_config_, &hash_fn); |
| llvm::Function* murmur_hash_fn; |
| codegen_status.MergeStatus( |
| HashTableCtx::CodegenHashRow(codegen, true, *hash_table_config_, &murmur_hash_fn)); |
| |
| // Codegen for evaluating build rows |
| llvm::Function* eval_build_row_fn; |
| codegen_status.MergeStatus(HashTableCtx::CodegenEvalRow( |
| codegen, true, *hash_table_config_, &eval_build_row_fn)); |
| |
| llvm::Function* insert_filters_fn; |
| codegen_status.MergeStatus( |
| CodegenInsertRuntimeFilters(codegen, filter_exprs_, &insert_filters_fn)); |
| |
| if (codegen_status.ok()) { |
| TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode; |
| build_codegen_status = CodegenProcessBuildBatch( |
| codegen, hash_fn, murmur_hash_fn, eval_build_row_fn, insert_filters_fn); |
| insert_codegen_status = CodegenInsertBatch( |
| codegen, hash_fn, murmur_hash_fn, eval_build_row_fn, prefetch_mode); |
| } else { |
| build_codegen_status = codegen_status; |
| insert_codegen_status = codegen_status; |
| } |
| AddCodegenStatus(build_codegen_status, "Build Side"); |
| AddCodegenStatus(insert_codegen_status, "Hash Table Construction"); |
| } |
| |
| string PhjBuilder::DebugString() const { |
| stringstream ss; |
| ss << " PhjBuilder op=" << PrintThriftEnum(join_op_) |
| << " is_separate_build=" << is_separate_build_ |
| << " num_probe_threads=" << num_probe_threads_ |
| << " state=" << PrintState(state_) |
| << " Hash partitions: " << hash_partitions_.size() << ":" << endl; |
| for (int i = 0; i < hash_partitions_.size(); ++i) { |
| ss << " Hash partition " << i << " " << hash_partitions_[i]->DebugString() << endl; |
| } |
| ss << " Spilled partitions: " << spilled_partitions_.size() << ":" << endl; |
| for (int i = 0; i < spilled_partitions_.size(); ++i) { |
| ss << " Spilled partition " << i << " " |
| << spilled_partitions_[i]->DebugString() << endl; |
| } |
| if (null_aware_partition_ != nullptr) { |
| ss << "Null-aware partition: " << null_aware_partition_->DebugString(); |
| } |
| ss << " buffer_pool_client=" << buffer_pool_client_->DebugString(); |
| return ss.str(); |
| } |
| |
| Status PhjBuilderConfig::CodegenProcessBuildBatch(LlvmCodeGen* codegen, |
| llvm::Function* hash_fn, llvm::Function* murmur_hash_fn, llvm::Function* eval_row_fn, |
| llvm::Function* insert_filters_fn) { |
| llvm::Function* process_build_batch_fn = |
| codegen->GetFunction(IRFunction::PHJ_PROCESS_BUILD_BATCH, true); |
| DCHECK(process_build_batch_fn != nullptr); |
| |
| // Replace call sites |
| int replaced = |
| codegen->ReplaceCallSites(process_build_batch_fn, eval_row_fn, "EvalBuildRow"); |
| DCHECK_REPLACE_COUNT(replaced, 1); |
| |
| replaced = codegen->ReplaceCallSites( |
| process_build_batch_fn, insert_filters_fn, "InsertRuntimeFilters"); |
| DCHECK_REPLACE_COUNT(replaced, 1); |
| |
| HashTableCtx::HashTableReplacedConstants replaced_constants; |
| const bool stores_duplicates = true; |
| const int num_build_tuples = input_row_desc_->tuple_descriptors().size(); |
| // Replace some hash table parameters with constants. |
| RETURN_IF_ERROR(HashTableCtx::ReplaceHashTableConstants(codegen, *hash_table_config_, |
| stores_duplicates, num_build_tuples, process_build_batch_fn, &replaced_constants)); |
| DCHECK_GE(replaced_constants.stores_nulls, 1); |
| DCHECK_EQ(replaced_constants.finds_some_nulls, 0); |
| DCHECK_EQ(replaced_constants.stores_duplicates, 0); |
| DCHECK_EQ(replaced_constants.stores_tuples, 0); |
| DCHECK_EQ(replaced_constants.quadratic_probing, 0); |
| |
| llvm::Value* is_null_aware_arg = codegen->GetArgument(process_build_batch_fn, 5); |
| is_null_aware_arg->replaceAllUsesWith( |
| codegen->GetBoolConstant(join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN)); |
| |
| llvm::Function* process_build_batch_fn_level0 = |
| codegen->CloneFunction(process_build_batch_fn); |
| |
| // Always build runtime filters at level0 (if there are any). |
| // Note that the first argument of this function is the return value. |
| llvm::Value* build_filter_l0_arg = |
| codegen->GetArgument(process_build_batch_fn_level0, 4); |
| build_filter_l0_arg->replaceAllUsesWith( |
| codegen->GetBoolConstant(filter_descs_.size() > 0)); |
| |
| // process_build_batch_fn_level0 uses CRC hash if available, |
| replaced = |
| codegen->ReplaceCallSites(process_build_batch_fn_level0, hash_fn, "HashRow"); |
| DCHECK_REPLACE_COUNT(replaced, 1); |
| |
| // process_build_batch_fn uses murmur |
| replaced = |
| codegen->ReplaceCallSites(process_build_batch_fn, murmur_hash_fn, "HashRow"); |
| DCHECK_REPLACE_COUNT(replaced, 1); |
| |
| // Never build filters after repartitioning, as all rows have already been added to the |
| // filters during the level0 build. Note that the first argument of this function is the |
| // return value. |
| llvm::Value* build_filter_arg = codegen->GetArgument(process_build_batch_fn, 4); |
| build_filter_arg->replaceAllUsesWith(codegen->false_value()); |
| |
| // Finalize ProcessBuildBatch functions |
| process_build_batch_fn = codegen->FinalizeFunction(process_build_batch_fn); |
| if (process_build_batch_fn == nullptr) { |
| return Status( |
| "Codegen'd PhjBuilder::ProcessBuildBatch() function " |
| "failed verification, see log"); |
| } |
| process_build_batch_fn_level0 = |
| codegen->FinalizeFunction(process_build_batch_fn_level0); |
| if (process_build_batch_fn == nullptr) { |
| return Status( |
| "Codegen'd level-zero PhjBuilder::ProcessBuildBatch() " |
| "function failed verification, see log"); |
| } |
| |
| // Register native function pointers |
| codegen->AddFunctionToJit(process_build_batch_fn, &process_build_batch_fn_); |
| codegen->AddFunctionToJit(process_build_batch_fn_level0, |
| &process_build_batch_fn_level0_); |
| return Status::OK(); |
| } |
| |
| Status PhjBuilderConfig::CodegenInsertBatch(LlvmCodeGen* codegen, llvm::Function* hash_fn, |
| llvm::Function* murmur_hash_fn, llvm::Function* eval_row_fn, |
| TPrefetchMode::type prefetch_mode) { |
| llvm::Function* insert_batch_fn = |
| codegen->GetFunction(IRFunction::PHJ_INSERT_BATCH, true); |
| // Context required to generate hash table codegened methods. |
| llvm::Function* build_equals_fn; |
| RETURN_IF_ERROR( |
| HashTableCtx::CodegenEquals(codegen, true, *hash_table_config_, &build_equals_fn)); |
| |
| // Replace the parameter 'prefetch_mode' with constant. |
| llvm::Value* prefetch_mode_arg = codegen->GetArgument(insert_batch_fn, 1); |
| DCHECK_GE(prefetch_mode, TPrefetchMode::NONE); |
| DCHECK_LE(prefetch_mode, TPrefetchMode::HT_BUCKET); |
| prefetch_mode_arg->replaceAllUsesWith(codegen->GetI32Constant(prefetch_mode)); |
| |
| // Use codegen'd EvalBuildRow() function |
| int replaced = codegen->ReplaceCallSites(insert_batch_fn, eval_row_fn, "EvalBuildRow"); |
| DCHECK_REPLACE_COUNT(replaced, 1); |
| |
| // Use codegen'd Equals() function |
| replaced = codegen->ReplaceCallSites(insert_batch_fn, build_equals_fn, "Equals"); |
| DCHECK_REPLACE_COUNT(replaced, 1); |
| |
| // Replace hash-table parameters with constants. |
| HashTableCtx::HashTableReplacedConstants replaced_constants; |
| const bool stores_duplicates = true; |
| const int num_build_tuples = input_row_desc_->tuple_descriptors().size(); |
| RETURN_IF_ERROR(HashTableCtx::ReplaceHashTableConstants(codegen, *hash_table_config_, |
| stores_duplicates, num_build_tuples, insert_batch_fn, &replaced_constants)); |
| DCHECK_GE(replaced_constants.stores_nulls, 1); |
| DCHECK_EQ(replaced_constants.finds_some_nulls, 0); |
| DCHECK_GE(replaced_constants.stores_duplicates, 1); |
| DCHECK_GE(replaced_constants.stores_tuples, 1); |
| DCHECK_GE(replaced_constants.quadratic_probing, 1); |
| |
| llvm::Function* insert_batch_fn_level0 = codegen->CloneFunction(insert_batch_fn); |
| |
| // Use codegen'd hash functions |
| replaced = codegen->ReplaceCallSites(insert_batch_fn_level0, hash_fn, "HashRow"); |
| DCHECK_REPLACE_COUNT(replaced, 1); |
| replaced = codegen->ReplaceCallSites(insert_batch_fn, murmur_hash_fn, "HashRow"); |
| DCHECK_REPLACE_COUNT(replaced, 1); |
| |
| insert_batch_fn = codegen->FinalizeFunction(insert_batch_fn); |
| if (insert_batch_fn == nullptr) { |
| return Status( |
| "PartitionedHashJoinNode::CodegenInsertBatch(): codegen'd " |
| "InsertBatch() function failed verification, see log"); |
| } |
| insert_batch_fn_level0 = codegen->FinalizeFunction(insert_batch_fn_level0); |
| if (insert_batch_fn_level0 == nullptr) { |
| return Status( |
| "PartitionedHashJoinNode::CodegenInsertBatch(): codegen'd zero-level " |
| "InsertBatch() function failed verification, see log"); |
| } |
| |
| codegen->AddFunctionToJit(insert_batch_fn, &insert_batch_fn_); |
| codegen->AddFunctionToJit(insert_batch_fn_level0, &insert_batch_fn_level0_); |
| return Status::OK(); |
| } |
| |
| // An example of the generated code for a query with two filters built by this node. |
| // |
| // ; Function Attrs: noinline |
| // define void @InsertRuntimeFilters(%"class.impala::PhjBuilder"* %this, |
| // %"struct.impala::FilterContext"* %filter_ctxs, |
| // %"class.impala::TupleRow"* %row) #49 { |
| // entry: |
| // %0 = getelementptr %"struct.impala::FilterContext", |
| // %"struct.impala::FilterContext"* %filter_ctxs, i32 0 |
| // call void @FilterContextInsert(%"struct.impala::FilterContext"* %0, |
| // %"class.impala::TupleRow"* %row) |
| // %1 = getelementptr %"struct.impala::FilterContext", |
| // %"struct.impala::FilterContext"* %filter_ctxs, i32 1 |
| // call void @FilterContextInsert.3(%"struct.impala::FilterContext"* %1, |
| // %"class.impala::TupleRow"* %row) |
| // ret void |
| // } |
| Status PhjBuilderConfig::CodegenInsertRuntimeFilters( |
| LlvmCodeGen* codegen, const vector<ScalarExpr*>& filter_exprs, llvm::Function** fn) { |
| llvm::LLVMContext& context = codegen->context(); |
| LlvmBuilder builder(context); |
| |
| *fn = nullptr; |
| llvm::Type* this_type = codegen->GetStructPtrType<PhjBuilder>(); |
| llvm::PointerType* filters_ctx_arr_type = codegen->GetStructPtrType<FilterContext>(); |
| llvm::PointerType* tuple_row_ptr_type = codegen->GetStructPtrType<TupleRow>(); |
| LlvmCodeGen::FnPrototype prototype( |
| codegen, "InsertRuntimeFilters", codegen->void_type()); |
| prototype.AddArgument(LlvmCodeGen::NamedVariable("this", this_type)); |
| prototype.AddArgument(LlvmCodeGen::NamedVariable("filter_ctxs", filters_ctx_arr_type)); |
| prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type)); |
| |
| llvm::Value* args[3]; |
| llvm::Function* insert_runtime_filters_fn = prototype.GeneratePrototype(&builder, args); |
| llvm::Value* filter_ctxs = args[1]; |
| llvm::Value* row_arg = args[2]; |
| |
| int num_filters = filter_exprs.size(); |
| for (int i = 0; i < num_filters; ++i) { |
| llvm::Function* insert_fn; |
| RETURN_IF_ERROR(FilterContext::CodegenInsert( |
| codegen, filter_exprs[i], filter_descs_[i], &insert_fn)); |
| llvm::Value* filter_context_ptr = builder.CreateConstGEP1_32(filter_ctxs, i); |
| llvm::Value* insert_args[] = {filter_context_ptr, row_arg}; |
| builder.CreateCall(insert_fn, insert_args); |
| } |
| |
| builder.CreateRetVoid(); |
| |
| if (num_filters > 0) { |
| // Don't inline this function to avoid code bloat in ProcessBuildBatch(). |
| // If there is any filter, InsertRuntimeFilters() is large enough to not benefit |
| // much from inlining. |
| insert_runtime_filters_fn->addFnAttr(llvm::Attribute::NoInline); |
| } |
| |
| *fn = codegen->FinalizeFunction(insert_runtime_filters_fn); |
| if (*fn == nullptr) { |
| return Status("Codegen'd PhjBuilder::InsertRuntimeFilters() failed " |
| "verification, see log"); |
| } |
| return Status::OK(); |
| } |