| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "exec/partitioned-hash-join-builder.h" |
| |
| #include <numeric> |
| |
| #include <gutil/strings/substitute.h> |
| |
| #include "codegen/llvm-codegen.h" |
| #include "exec/hash-table.inline.h" |
| #include "exprs/scalar-expr.h" |
| #include "exprs/scalar-expr-evaluator.h" |
| #include "runtime/buffered-tuple-stream.h" |
| #include "runtime/exec-env.h" |
| #include "runtime/mem-tracker.h" |
| #include "runtime/query-state.h" |
| #include "runtime/row-batch.h" |
| #include "runtime/runtime-filter-bank.h" |
| #include "runtime/runtime-filter.h" |
| #include "runtime/runtime-state.h" |
| #include "util/bloom-filter.h" |
| #include "util/min-max-filter.h" |
| #include "util/runtime-profile-counters.h" |
| |
| #include "gen-cpp/PlanNodes_types.h" |
| |
| #include "common/names.h" |
| |
| static const string PREPARE_FOR_READ_FAILED_ERROR_MSG = |
| "Failed to acquire initial read " |
| "buffer for stream in hash join node $0. Reducing query concurrency or increasing " |
| "the memory limit may help this query to complete successfully."; |
| |
| using namespace impala; |
| using strings::Substitute; |
| |
| const char* PhjBuilder::LLVM_CLASS_NAME = "class.impala::PhjBuilder"; |
| |
| PhjBuilder::PhjBuilder(int join_node_id, const string& join_node_label, |
| TJoinOp::type join_op, const RowDescriptor* probe_row_desc, |
| const RowDescriptor* build_row_desc, RuntimeState* state, |
| BufferPool::ClientHandle* buffer_pool_client, int64_t spillable_buffer_size, |
| int64_t max_row_buffer_size) |
| : DataSink(-1, build_row_desc, |
| Substitute("Hash Join Builder (join_node_id=$0)", join_node_id), state), |
| runtime_state_(state), |
| join_node_id_(join_node_id), |
| join_node_label_(join_node_label), |
| join_op_(join_op), |
| probe_row_desc_(probe_row_desc), |
| buffer_pool_client_(buffer_pool_client), |
| spillable_buffer_size_(spillable_buffer_size), |
| max_row_buffer_size_(max_row_buffer_size), |
| non_empty_build_(false), |
| partitions_created_(NULL), |
| largest_partition_percent_(NULL), |
| max_partition_level_(NULL), |
| num_build_rows_partitioned_(NULL), |
| num_spilled_partitions_(NULL), |
| num_repartitions_(NULL), |
| partition_build_rows_timer_(NULL), |
| build_hash_table_timer_(NULL), |
| repartition_timer_(NULL), |
| null_aware_partition_(NULL), |
| process_build_batch_fn_(NULL), |
| process_build_batch_fn_level0_(NULL), |
| insert_batch_fn_(NULL), |
| insert_batch_fn_level0_(NULL) {} |
| |
| Status PhjBuilder::InitExprsAndFilters(RuntimeState* state, |
| const vector<TEqJoinCondition>& eq_join_conjuncts, |
| const vector<TRuntimeFilterDesc>& filter_descs) { |
| for (const TEqJoinCondition& eq_join_conjunct : eq_join_conjuncts) { |
| ScalarExpr* build_expr; |
| RETURN_IF_ERROR( |
| ScalarExpr::Create(eq_join_conjunct.right, *row_desc_, state, &build_expr)); |
| build_exprs_.push_back(build_expr); |
| is_not_distinct_from_.push_back(eq_join_conjunct.is_not_distinct_from); |
| } |
| |
| for (const TRuntimeFilterDesc& filter_desc : filter_descs) { |
| DCHECK(state->query_options().runtime_filter_mode == TRuntimeFilterMode::GLOBAL || |
| filter_desc.is_broadcast_join || state->query_options().num_nodes == 1); |
| DCHECK(!state->query_options().disable_row_runtime_filtering || |
| filter_desc.applied_on_partition_columns); |
| // Skip over filters that are not produced by this instance of the join, i.e. |
| // broadcast filters where this instance was not selected as a filter producer. |
| const vector<TRuntimeFilterSource> filters_produced = |
| state->instance_ctx().filters_produced; |
| auto it = std::find_if(filters_produced.begin(), filters_produced.end(), |
| [this, &filter_desc](const TRuntimeFilterSource f) { |
| return f.src_node_id == join_node_id_ && f.filter_id == filter_desc.filter_id; |
| }); |
| if (it == filters_produced.end()) continue; |
| ScalarExpr* filter_expr; |
| RETURN_IF_ERROR( |
| ScalarExpr::Create(filter_desc.src_expr, *row_desc_, state, &filter_expr)); |
| filter_exprs_.push_back(filter_expr); |
| |
| // TODO: Move to Prepare(). |
| filter_ctxs_.emplace_back(); |
| // TODO: IMPALA-4400 - implement local aggregation of runtime filters. |
| filter_ctxs_.back().filter = state->filter_bank()->RegisterFilter(filter_desc, true); |
| } |
| return Status::OK(); |
| } |
| |
| Status PhjBuilder::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) { |
| RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker)); |
| RETURN_IF_ERROR(HashTableCtx::Create(&obj_pool_, state, build_exprs_, build_exprs_, |
| HashTableStoresNulls(), is_not_distinct_from_, state->fragment_hash_seed(), |
| MAX_PARTITION_DEPTH, row_desc_->tuple_descriptors().size(), expr_perm_pool_.get(), |
| expr_results_pool_.get(), expr_results_pool_.get(), &ht_ctx_)); |
| |
| DCHECK_EQ(filter_exprs_.size(), filter_ctxs_.size()); |
| for (int i = 0; i < filter_exprs_.size(); ++i) { |
| RETURN_IF_ERROR(ScalarExprEvaluator::Create(*filter_exprs_[i], state, &obj_pool_, |
| expr_perm_pool_.get(), expr_results_pool_.get(), &filter_ctxs_[i].expr_eval)); |
| } |
| |
| partitions_created_ = ADD_COUNTER(profile(), "PartitionsCreated", TUnit::UNIT); |
| largest_partition_percent_ = |
| profile()->AddHighWaterMarkCounter("LargestPartitionPercent", TUnit::UNIT); |
| max_partition_level_ = |
| profile()->AddHighWaterMarkCounter("MaxPartitionLevel", TUnit::UNIT); |
| num_build_rows_partitioned_ = |
| ADD_COUNTER(profile(), "BuildRowsPartitioned", TUnit::UNIT); |
| ht_stats_profile_ = HashTable::AddHashTableCounters(profile()); |
| num_spilled_partitions_ = ADD_COUNTER(profile(), "SpilledPartitions", TUnit::UNIT); |
| num_repartitions_ = ADD_COUNTER(profile(), "NumRepartitions", TUnit::UNIT); |
| partition_build_rows_timer_ = ADD_TIMER(profile(), "BuildRowsPartitionTime"); |
| build_hash_table_timer_ = ADD_TIMER(profile(), "HashTablesBuildTime"); |
| repartition_timer_ = ADD_TIMER(profile(), "RepartitionTime"); |
| state->CheckAndAddCodegenDisabledMessage(profile()); |
| return Status::OK(); |
| } |
| |
| Status PhjBuilder::Open(RuntimeState* state) { |
| RETURN_IF_ERROR(ht_ctx_->Open(state)); |
| |
| for (const FilterContext& ctx : filter_ctxs_) { |
| RETURN_IF_ERROR(ctx.expr_eval->Open(state)); |
| } |
| if (ht_allocator_ == nullptr) { |
| // Create 'ht_allocator_' on the first call to Open(). |
| ht_allocator_.reset(new Suballocator(ExecEnv::GetInstance()->buffer_pool(), |
| buffer_pool_client_, spillable_buffer_size_)); |
| } |
| RETURN_IF_ERROR(CreateHashPartitions(0)); |
| AllocateRuntimeFilters(); |
| |
| if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { |
| RETURN_IF_ERROR(CreateAndPreparePartition(0, &null_aware_partition_)); |
| } |
| return Status::OK(); |
| } |
| |
| Status PhjBuilder::Send(RuntimeState* state, RowBatch* batch) { |
| SCOPED_TIMER(partition_build_rows_timer_); |
| bool build_filters = ht_ctx_->level() == 0 && filter_ctxs_.size() > 0; |
| if (process_build_batch_fn_ == NULL) { |
| RETURN_IF_ERROR(ProcessBuildBatch(batch, ht_ctx_.get(), build_filters, |
| join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN)); |
| |
| } else { |
| DCHECK(process_build_batch_fn_level0_ != NULL); |
| if (ht_ctx_->level() == 0) { |
| RETURN_IF_ERROR( |
| process_build_batch_fn_level0_(this, batch, ht_ctx_.get(), build_filters, |
| join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN)); |
| } else { |
| RETURN_IF_ERROR(process_build_batch_fn_(this, batch, ht_ctx_.get(), build_filters, |
| join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN)); |
| } |
| } |
| |
| // Free any expr result allocations made during partitioning. |
| expr_results_pool_->Clear(); |
| COUNTER_ADD(num_build_rows_partitioned_, batch->num_rows()); |
| return Status::OK(); |
| } |
| |
| Status PhjBuilder::FlushFinal(RuntimeState* state) { |
| int64_t num_build_rows = 0; |
| for (Partition* partition : hash_partitions_) { |
| num_build_rows += partition->build_rows()->num_rows(); |
| } |
| |
| if (num_build_rows > 0) { |
| double largest_fraction = 0.0; |
| for (Partition* partition : hash_partitions_) { |
| largest_fraction = max(largest_fraction, |
| partition->build_rows()->num_rows() / static_cast<double>(num_build_rows)); |
| } |
| COUNTER_SET(largest_partition_percent_, static_cast<int64_t>(largest_fraction * 100)); |
| } |
| |
| if (VLOG_IS_ON(2)) { |
| stringstream ss; |
| ss << Substitute("PHJ(node_id=$0) partitioned(level=$1) $2 rows into:", join_node_id_, |
| hash_partitions_[0]->level(), num_build_rows); |
| for (int i = 0; i < hash_partitions_.size(); ++i) { |
| Partition* partition = hash_partitions_[i]; |
| double percent = num_build_rows == 0 ? 0.0 : partition->build_rows()->num_rows() |
| * 100 / static_cast<double>(num_build_rows); |
| ss << " " << i << " " << (partition->is_spilled() ? "spilled" : "not spilled") |
| << " (fraction=" << fixed << setprecision(2) << percent << "%)" << endl |
| << " #rows:" << partition->build_rows()->num_rows() << endl; |
| } |
| if (null_aware_partition_ != nullptr) { |
| ss << " Null-aware partition: " << null_aware_partition_->DebugString(); |
| } |
| VLOG(2) << ss.str(); |
| } |
| |
| if (ht_ctx_->level() == 0) { |
| PublishRuntimeFilters(num_build_rows); |
| non_empty_build_ |= (num_build_rows > 0); |
| } |
| |
| if (null_aware_partition_ != nullptr && null_aware_partition_->is_spilled()) { |
| // Free up memory for the hash tables of other partitions by unpinning the |
| // last block of the null aware partition's stream. |
| RETURN_IF_ERROR(null_aware_partition_->Spill(BufferedTupleStream::UNPIN_ALL)); |
| } |
| |
| RETURN_IF_ERROR(BuildHashTablesAndPrepareProbeStreams()); |
| return Status::OK(); |
| } |
| |
| void PhjBuilder::Close(RuntimeState* state) { |
| if (closed_) return; |
| CloseAndDeletePartitions(nullptr); |
| if (ht_ctx_ != nullptr) ht_ctx_->Close(state); |
| ht_ctx_.reset(); |
| for (const FilterContext& ctx : filter_ctxs_) { |
| if (ctx.expr_eval != nullptr) ctx.expr_eval->Close(state); |
| } |
| ScalarExpr::Close(filter_exprs_); |
| ScalarExpr::Close(build_exprs_); |
| obj_pool_.Clear(); |
| DataSink::Close(state); |
| closed_ = true; |
| } |
| |
| void PhjBuilder::Reset(RowBatch* row_batch) { |
| expr_results_pool_->Clear(); |
| non_empty_build_ = false; |
| CloseAndDeletePartitions(row_batch); |
| } |
| |
| Status PhjBuilder::CreateAndPreparePartition(int level, Partition** partition) { |
| all_partitions_.emplace_back(new Partition(runtime_state_, this, level)); |
| *partition = all_partitions_.back().get(); |
| RETURN_IF_ERROR((*partition)->build_rows()->Init(join_node_label_, true)); |
| bool got_buffer; |
| RETURN_IF_ERROR((*partition)->build_rows()->PrepareForWrite(&got_buffer)); |
| DCHECK(got_buffer) |
| << "Accounted in min reservation" << buffer_pool_client_->DebugString(); |
| return Status::OK(); |
| } |
| |
| Status PhjBuilder::CreateHashPartitions(int level) { |
| DCHECK(hash_partitions_.empty()); |
| ht_ctx_->set_level(level); // Set the hash function for partitioning input. |
| for (int i = 0; i < PARTITION_FANOUT; ++i) { |
| Partition* new_partition; |
| RETURN_IF_ERROR(CreateAndPreparePartition(level, &new_partition)); |
| hash_partitions_.push_back(new_partition); |
| } |
| COUNTER_ADD(partitions_created_, PARTITION_FANOUT); |
| COUNTER_SET(max_partition_level_, level); |
| return Status::OK(); |
| } |
| |
| bool PhjBuilder::AppendRowStreamFull( |
| BufferedTupleStream* stream, TupleRow* row, Status* status) noexcept { |
| while (true) { |
| // We ran out of memory. Pick a partition to spill. If we ran out of unspilled |
| // partitions, SpillPartition() will return an error status. |
| *status = SpillPartition(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT); |
| if (!status->ok()) return false; |
| if (stream->AddRow(row, status)) return true; |
| if (!status->ok()) return false; |
| // Spilling one partition does not guarantee we can append a row. Keep |
| // spilling until we can append this row. |
| } |
| } |
| |
| // TODO: can we do better with a different spilling heuristic? |
| Status PhjBuilder::SpillPartition(BufferedTupleStream::UnpinMode mode, |
| Partition** spilled_partition) { |
| DCHECK_EQ(hash_partitions_.size(), PARTITION_FANOUT); |
| Partition* best_candidate = nullptr; |
| if (null_aware_partition_ != nullptr && null_aware_partition_->CanSpill()) { |
| // Spill null-aware partition first if possible - it is always processed last. |
| best_candidate = null_aware_partition_; |
| } else { |
| // Iterate over the partitions and pick the largest partition to spill. |
| int64_t max_freed_mem = 0; |
| for (Partition* candidate : hash_partitions_) { |
| if (!candidate->CanSpill()) continue; |
| int64_t mem = candidate->build_rows()->BytesPinned(false); |
| if (candidate->hash_tbl() != NULL) { |
| // The hash table should not have matches, since we have not probed it yet. |
| // Losing match info would lead to incorrect results (IMPALA-1488). |
| DCHECK(!candidate->hash_tbl()->HasMatches()); |
| mem += candidate->hash_tbl()->ByteSize(); |
| } |
| if (mem > max_freed_mem) { |
| max_freed_mem = mem; |
| best_candidate = candidate; |
| } |
| } |
| } |
| |
| if (best_candidate == nullptr) { |
| return Status(Substitute("Internal error: could not find a partition to spill in " |
| " hash join $0: \n$1\nClient:\n$2", |
| join_node_id_, DebugString(), buffer_pool_client_->DebugString())); |
| } |
| |
| VLOG(2) << "Spilling partition: " << best_candidate->DebugString() << endl |
| << DebugString(); |
| RETURN_IF_ERROR(best_candidate->Spill(mode)); |
| if (spilled_partition != nullptr) *spilled_partition = best_candidate; |
| return Status::OK(); |
| } |
| |
| // When this function is called, we've finished processing the current build input |
| // (either from the child ExecNode or from repartitioning a spilled partition). The build |
| // rows have only been partitioned, we still need to build hash tables over them. Some |
| // of the partitions could have already been spilled and attempting to build hash |
| // tables over the non-spilled ones can cause them to spill. |
| // |
| // At the end of the function all partitions either have a hash table (and therefore are |
| // not spilled) or are spilled. Partitions that have hash tables don't need to spill on |
| // the probe side. |
| // |
| // This maps perfectly to a 0-1 knapsack where the weight is the memory to keep the |
| // build rows and hash table and the value is the expected IO savings. |
| // For now, we go with a greedy solution. |
| // |
| // TODO: implement the knapsack solution. |
| Status PhjBuilder::BuildHashTablesAndPrepareProbeStreams() { |
| DCHECK_EQ(PARTITION_FANOUT, hash_partitions_.size()); |
| |
| for (int i = 0; i < PARTITION_FANOUT; ++i) { |
| Partition* partition = hash_partitions_[i]; |
| if (partition->build_rows()->num_rows() == 0) { |
| // This partition is empty, no need to do anything else. |
| partition->Close(NULL); |
| } else if (partition->is_spilled()) { |
| // We don't need any build-side data for spilled partitions in memory. |
| RETURN_IF_ERROR( |
| partition->build_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL)); |
| } |
| } |
| |
| // Allocate probe buffers for all partitions that are already spilled. Do this before |
| // building hash tables because allocating probe buffers may cause some more partitions |
| // to be spilled. This avoids wasted work on building hash tables for partitions that |
| // won't fit in memory alongside the required probe buffers. |
| RETURN_IF_ERROR(InitSpilledPartitionProbeStreams()); |
| |
| for (int i = 0; i < PARTITION_FANOUT; ++i) { |
| Partition* partition = hash_partitions_[i]; |
| if (partition->IsClosed() || partition->is_spilled()) continue; |
| |
| bool built = false; |
| DCHECK(partition->build_rows()->is_pinned()); |
| RETURN_IF_ERROR(partition->BuildHashTable(&built)); |
| // If we did not have enough memory to build this hash table, we need to spill this |
| // partition (clean up the hash table, unpin build). |
| if (!built) RETURN_IF_ERROR(partition->Spill(BufferedTupleStream::UNPIN_ALL)); |
| } |
| |
| // We may have spilled additional partitions while building hash tables, we need to |
| // initialize probe buffers for them. |
| // TODO: once we have reliable reservations (IMPALA-3200) this should no longer be |
| // necessary: we will know exactly how many partitions will fit in memory and we can |
| // avoid building then immediately destroying hash tables. |
| RETURN_IF_ERROR(InitSpilledPartitionProbeStreams()); |
| |
| // TODO: at this point we could have freed enough memory to pin and build some |
| // spilled partitions. This can happen, for example if there is a lot of skew. |
| // Partition 1: 10GB (pinned initially). |
| // Partition 2,3,4: 1GB (spilled during partitioning the build). |
| // In the previous step, we could have unpinned 10GB (because there was not enough |
| // memory to build a hash table over it) which can now free enough memory to |
| // build hash tables over the remaining 3 partitions. |
| // We start by spilling the largest partition though so the build input would have |
| // to be pretty pathological. |
| // Investigate if this is worthwhile. |
| return Status::OK(); |
| } |
| |
| Status PhjBuilder::InitSpilledPartitionProbeStreams() { |
| DCHECK_EQ(PARTITION_FANOUT, hash_partitions_.size()); |
| |
| int num_spilled_partitions = 0; |
| for (int i = 0; i < PARTITION_FANOUT; ++i) { |
| Partition* partition = hash_partitions_[i]; |
| if (!partition->IsClosed() && partition->is_spilled()) ++num_spilled_partitions; |
| } |
| int probe_streams_to_create = |
| num_spilled_partitions - spilled_partition_probe_streams_.size(); |
| |
| while (probe_streams_to_create > 0) { |
| // Create stream in vector, so that it will be cleaned up after any failure. |
| spilled_partition_probe_streams_.emplace_back( |
| make_unique<BufferedTupleStream>(runtime_state_, probe_row_desc_, |
| buffer_pool_client_, spillable_buffer_size_, max_row_buffer_size_)); |
| BufferedTupleStream* probe_stream = spilled_partition_probe_streams_.back().get(); |
| RETURN_IF_ERROR(probe_stream->Init(join_node_label_, false)); |
| |
| // Loop until either the stream gets a buffer or all partitions are spilled (in which |
| // case SpillPartition() returns an error). |
| while (true) { |
| bool got_buffer; |
| RETURN_IF_ERROR(probe_stream->PrepareForWrite(&got_buffer)); |
| if (got_buffer) break; |
| |
| Partition* spilled_partition; |
| RETURN_IF_ERROR(SpillPartition( |
| BufferedTupleStream::UNPIN_ALL, &spilled_partition)); |
| // Don't need to create a probe stream for the null-aware partition. |
| if (spilled_partition != null_aware_partition_) ++probe_streams_to_create; |
| } |
| --probe_streams_to_create; |
| } |
| return Status::OK(); |
| } |
| |
| vector<unique_ptr<BufferedTupleStream>> PhjBuilder::TransferProbeStreams() { |
| return std::move(spilled_partition_probe_streams_); |
| } |
| |
| void PhjBuilder::DoneProbing(const bool retain_partition[PARTITION_FANOUT], |
| list<Partition*>* output_partitions, RowBatch* batch) { |
| DCHECK(output_partitions->empty()); |
| for (int i = 0; i < PARTITION_FANOUT; ++i) { |
| PhjBuilder::Partition* partition = hash_partitions_[i]; |
| if (partition->IsClosed()) continue; |
| if (partition->is_spilled()) { |
| DCHECK(partition->hash_tbl() == nullptr) << DebugString(); |
| DCHECK_EQ(partition->build_rows()->BytesPinned(false), 0) |
| << "Build was fully unpinned in BuildHashTablesAndPrepareProbeStreams()"; |
| // Release resources associated with completed partitions. |
| if (!retain_partition[i]) partition->Close(nullptr); |
| } else if (NeedToProcessUnmatchedBuildRows(join_op_)) { |
| output_partitions->push_back(partition); |
| } else { |
| // No more processing is required for this partition. |
| partition->Close(batch); |
| } |
| } |
| hash_partitions_.clear(); |
| } |
| |
| void PhjBuilder::CloseAndDeletePartitions(RowBatch* row_batch) { |
| // Close all the partitions and clean up all references to them. |
| for (unique_ptr<Partition>& partition : all_partitions_) partition->Close(row_batch); |
| all_partitions_.clear(); |
| hash_partitions_.clear(); |
| null_aware_partition_ = NULL; |
| for (unique_ptr<BufferedTupleStream>& stream : spilled_partition_probe_streams_) { |
| // Streams need to be cleaned up, but were never handed off to probe, so won't have |
| // any data in them. |
| DCHECK_EQ(0, stream->num_rows()); |
| stream->Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES); |
| } |
| spilled_partition_probe_streams_.clear(); |
| } |
| |
| void PhjBuilder::AllocateRuntimeFilters() { |
| DCHECK(join_op_ != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || filter_ctxs_.size() == 0) |
| << "Runtime filters not supported with NULL_AWARE_LEFT_ANTI_JOIN"; |
| DCHECK(ht_ctx_ != NULL); |
| for (int i = 0; i < filter_ctxs_.size(); ++i) { |
| if (filter_ctxs_[i].filter->is_bloom_filter()) { |
| filter_ctxs_[i].local_bloom_filter = |
| runtime_state_->filter_bank()->AllocateScratchBloomFilter( |
| filter_ctxs_[i].filter->id()); |
| } else { |
| DCHECK(filter_ctxs_[i].filter->is_min_max_filter()); |
| filter_ctxs_[i].local_min_max_filter = |
| runtime_state_->filter_bank()->AllocateScratchMinMaxFilter( |
| filter_ctxs_[i].filter->id(), filter_ctxs_[i].expr_eval->root().type()); |
| } |
| } |
| } |
| |
| void PhjBuilder::InsertRuntimeFilters(TupleRow* build_row) noexcept { |
| for (const FilterContext& ctx : filter_ctxs_) ctx.Insert(build_row); |
| } |
| |
| void PhjBuilder::PublishRuntimeFilters(int64_t num_build_rows) { |
| int32_t num_enabled_filters = 0; |
| // Use 'num_build_rows' to estimate FP-rate of each Bloom filter, and publish |
| // 'always-true' filters if it's too high. Doing so saves CPU at the coordinator, |
| // serialisation time, and reduces the cost of applying the filter at the scan - most |
| // significantly for per-row filters. However, the number of build rows could be a very |
| // poor estimate of the NDV - particularly if the filter expression is a function of |
| // several columns. |
| // TODO: Better heuristic. |
| for (const FilterContext& ctx : filter_ctxs_) { |
| // TODO: Consider checking actual number of bits set in filter to compute FP rate. |
| // TODO: Consider checking this every few batches or so. |
| BloomFilter* bloom_filter = nullptr; |
| if (ctx.local_bloom_filter != nullptr) { |
| if (runtime_state_->filter_bank()->FpRateTooHigh( |
| ctx.filter->filter_size(), num_build_rows)) { |
| bloom_filter = BloomFilter::ALWAYS_TRUE_FILTER; |
| } else { |
| bloom_filter = ctx.local_bloom_filter; |
| ++num_enabled_filters; |
| } |
| } else if (ctx.local_min_max_filter != nullptr |
| && !ctx.local_min_max_filter->AlwaysTrue()) { |
| ++num_enabled_filters; |
| } |
| |
| runtime_state_->filter_bank()->UpdateFilterFromLocal( |
| ctx.filter->id(), bloom_filter, ctx.local_min_max_filter); |
| } |
| |
| if (filter_ctxs_.size() > 0) { |
| string info_string; |
| if (num_enabled_filters == filter_ctxs_.size()) { |
| info_string = Substitute("$0 of $0 Runtime Filter$1 Published", filter_ctxs_.size(), |
| filter_ctxs_.size() == 1 ? "" : "s"); |
| } else { |
| info_string = Substitute("$0 of $1 Runtime Filter$2 Published, $3 Disabled", |
| num_enabled_filters, filter_ctxs_.size(), filter_ctxs_.size() == 1 ? "" : "s", |
| filter_ctxs_.size() - num_enabled_filters); |
| } |
| profile()->AddInfoString("Runtime filters", info_string); |
| } |
| } |
| |
| Status PhjBuilder::RepartitionBuildInput( |
| Partition* input_partition, int level, BufferedTupleStream* input_probe_rows) { |
| DCHECK_GE(level, 1); |
| SCOPED_TIMER(repartition_timer_); |
| COUNTER_ADD(num_repartitions_, 1); |
| RuntimeState* state = runtime_state_; |
| |
| // Setup the read buffer and the new partitions. |
| BufferedTupleStream* build_rows = input_partition->build_rows(); |
| DCHECK(build_rows != NULL); |
| bool got_read_buffer; |
| RETURN_IF_ERROR(build_rows->PrepareForRead(true, &got_read_buffer)); |
| if (!got_read_buffer) { |
| return mem_tracker()->MemLimitExceeded( |
| state, Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, join_node_id_)); |
| } |
| RETURN_IF_ERROR(CreateHashPartitions(level)); |
| |
| // Repartition 'input_stream' into 'hash_partitions_'. |
| RowBatch build_batch(row_desc_, state->batch_size(), mem_tracker()); |
| bool eos = false; |
| while (!eos) { |
| RETURN_IF_CANCELLED(state); |
| RETURN_IF_ERROR(state->CheckQueryState()); |
| |
| RETURN_IF_ERROR(build_rows->GetNext(&build_batch, &eos)); |
| RETURN_IF_ERROR(Send(state, &build_batch)); |
| build_batch.Reset(); |
| } |
| |
| // Done reading the input, we can safely close it now to free memory. |
| input_partition->Close(NULL); |
| |
| // We just freed up the buffer used for reading build rows. Ensure a buffer is |
| // allocated for reading probe rows before we build the hash tables in FlushFinal(). |
| // TODO: once we have reliable reservations (IMPALA-3200) we can just hand off the |
| // reservation and avoid this complication. |
| while (true) { |
| bool got_buffer; |
| RETURN_IF_ERROR(input_probe_rows->PrepareForRead(true, &got_buffer)); |
| if (got_buffer) break; |
| RETURN_IF_ERROR(SpillPartition(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT)); |
| } |
| |
| RETURN_IF_ERROR(FlushFinal(state)); |
| return Status::OK(); |
| } |
| |
| int64_t PhjBuilder::LargestPartitionRows() const { |
| int64_t max_rows = 0; |
| for (int i = 0; i < hash_partitions_.size(); ++i) { |
| Partition* partition = hash_partitions_[i]; |
| DCHECK(partition != NULL); |
| if (partition->IsClosed()) continue; |
| int64_t rows = partition->build_rows()->num_rows(); |
| if (rows > max_rows) max_rows = rows; |
| } |
| return max_rows; |
| } |
| |
| bool PhjBuilder::HashTableStoresNulls() const { |
| return join_op_ == TJoinOp::RIGHT_OUTER_JOIN || join_op_ == TJoinOp::RIGHT_ANTI_JOIN |
| || join_op_ == TJoinOp::FULL_OUTER_JOIN |
| || std::accumulate(is_not_distinct_from_.begin(), is_not_distinct_from_.end(), |
| false, std::logical_or<bool>()); |
| } |
| |
| PhjBuilder::Partition::Partition(RuntimeState* state, PhjBuilder* parent, int level) |
| : parent_(parent), is_spilled_(false), level_(level) { |
| build_rows_ = make_unique<BufferedTupleStream>(state, parent_->row_desc_, |
| parent_->buffer_pool_client_, parent->spillable_buffer_size_, |
| parent->max_row_buffer_size_); |
| } |
| |
| PhjBuilder::Partition::~Partition() { |
| DCHECK(IsClosed()); |
| } |
| |
| int64_t PhjBuilder::Partition::EstimatedInMemSize() const { |
| return build_rows_->byte_size() + HashTable::EstimateSize(build_rows_->num_rows()); |
| } |
| |
| void PhjBuilder::Partition::Close(RowBatch* batch) { |
| if (IsClosed()) return; |
| |
| if (hash_tbl_ != NULL) { |
| hash_tbl_->StatsCountersAdd(parent_->ht_stats_profile_.get()); |
| hash_tbl_->Close(); |
| } |
| |
| // Transfer ownership of 'build_rows_' memory to 'batch' if 'batch' is not NULL. |
| // Flush out the resources to free up memory for subsequent partitions. |
| if (build_rows_ != NULL) { |
| build_rows_->Close(batch, RowBatch::FlushMode::FLUSH_RESOURCES); |
| build_rows_.reset(); |
| } |
| } |
| |
| Status PhjBuilder::Partition::Spill(BufferedTupleStream::UnpinMode mode) { |
| DCHECK(!IsClosed()); |
| RETURN_IF_ERROR(parent_->runtime_state_->StartSpilling(parent_->mem_tracker())); |
| // Close the hash table and unpin the stream backing it to free memory. |
| if (hash_tbl() != NULL) { |
| hash_tbl_->Close(); |
| hash_tbl_.reset(); |
| } |
| RETURN_IF_ERROR(build_rows_->UnpinStream(mode)); |
| if (!is_spilled_) { |
| COUNTER_ADD(parent_->num_spilled_partitions_, 1); |
| if (parent_->num_spilled_partitions_->value() == 1) { |
| parent_->profile()->AppendExecOption("Spilled"); |
| } |
| } |
| is_spilled_ = true; |
| return Status::OK(); |
| } |
| |
| Status PhjBuilder::Partition::BuildHashTable(bool* built) { |
| SCOPED_TIMER(parent_->build_hash_table_timer_); |
| DCHECK(build_rows_ != NULL); |
| *built = false; |
| |
| // Before building the hash table, we need to pin the rows in memory. |
| RETURN_IF_ERROR(build_rows_->PinStream(built)); |
| if (!*built) return Status::OK(); |
| |
| RuntimeState* state = parent_->runtime_state_; |
| HashTableCtx* ctx = parent_->ht_ctx_.get(); |
| ctx->set_level(level()); // Set the hash function for building the hash table. |
| RowBatch batch(parent_->row_desc_, state->batch_size(), parent_->mem_tracker()); |
| vector<BufferedTupleStream::FlatRowPtr> flat_rows; |
| bool eos = false; |
| |
| // Allocate the partition-local hash table. Initialize the number of buckets based on |
| // the number of build rows (the number of rows is known at this point). This assumes |
| // there are no duplicates which can be wrong. However, the upside in the common case |
| // (few/no duplicates) is large and the downside when there are is low (a bit more |
| // memory; the bucket memory is small compared to the memory needed for all the build |
| // side allocations). |
| // One corner case is if the stream contains tuples with zero footprint (no materialized |
| // slots). If the tuples occupy no space, this implies all rows will be duplicates, so |
| // create a small hash table, IMPALA-2256. |
| // |
| // TODO: Try to allocate the hash table before pinning the stream to avoid needlessly |
| // reading all of the spilled rows from disk when we won't succeed anyway. |
| int64_t estimated_num_buckets = HashTable::EstimateNumBuckets(build_rows()->num_rows()); |
| hash_tbl_.reset(HashTable::Create(parent_->ht_allocator_.get(), |
| true /* store_duplicates */, parent_->row_desc_->tuple_descriptors().size(), |
| build_rows(), 1 << (32 - NUM_PARTITIONING_BITS), estimated_num_buckets)); |
| bool success; |
| Status status = hash_tbl_->Init(&success); |
| if (!status.ok() || !success) goto not_built; |
| status = build_rows_->PrepareForRead(false, &success); |
| if (!status.ok()) goto not_built; |
| DCHECK(success) << "Stream was already pinned."; |
| |
| do { |
| status = build_rows_->GetNext(&batch, &eos, &flat_rows); |
| if (!status.ok()) goto not_built; |
| DCHECK_EQ(batch.num_rows(), flat_rows.size()); |
| DCHECK_LE(batch.num_rows(), hash_tbl_->EmptyBuckets()); |
| TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode; |
| if (parent_->insert_batch_fn_ != NULL) { |
| InsertBatchFn insert_batch_fn; |
| if (level() == 0) { |
| insert_batch_fn = parent_->insert_batch_fn_level0_; |
| } else { |
| insert_batch_fn = parent_->insert_batch_fn_; |
| } |
| DCHECK(insert_batch_fn != NULL); |
| if (UNLIKELY( |
| !insert_batch_fn(this, prefetch_mode, ctx, &batch, flat_rows, &status))) { |
| goto not_built; |
| } |
| } else if (UNLIKELY(!InsertBatch(prefetch_mode, ctx, &batch, flat_rows, &status))) { |
| goto not_built; |
| } |
| RETURN_IF_CANCELLED(state); |
| RETURN_IF_ERROR(state->GetQueryStatus()); |
| // Free any expr result allocations made while inserting. |
| parent_->expr_results_pool_->Clear(); |
| batch.Reset(); |
| } while (!eos); |
| |
| // The hash table fits in memory and is built. |
| DCHECK(*built); |
| DCHECK(hash_tbl_ != NULL); |
| is_spilled_ = false; |
| COUNTER_ADD(parent_->ht_stats_profile_->num_hash_buckets_, |
| hash_tbl_->num_buckets()); |
| return Status::OK(); |
| |
| not_built: |
| *built = false; |
| if (hash_tbl_ != NULL) { |
| hash_tbl_->Close(); |
| hash_tbl_.reset(); |
| } |
| return status; |
| } |
| |
| std::string PhjBuilder::Partition::DebugString() { |
| stringstream ss; |
| ss << "<Partition>: ptr=" << this; |
| if (IsClosed()) { |
| ss << " Closed"; |
| return ss.str(); |
| } |
| if (is_spilled()) { |
| ss << " Spilled"; |
| } |
| DCHECK(build_rows() != nullptr); |
| ss << endl |
| << " Build Rows: " << build_rows_->num_rows() |
| << " (Bytes pinned: " << build_rows_->BytesPinned(false) << ")" |
| << endl; |
| if (hash_tbl_ != NULL) { |
| ss << " Hash Table Rows: " << hash_tbl_->size(); |
| } |
| return ss.str(); |
| } |
| |
| void PhjBuilder::Codegen(LlvmCodeGen* codegen) { |
| Status build_codegen_status; |
| Status insert_codegen_status; |
| Status codegen_status; |
| |
| // Codegen for hashing rows with the builder's hash table context. |
| llvm::Function* hash_fn; |
| codegen_status = ht_ctx_->CodegenHashRow(codegen, false, &hash_fn); |
| llvm::Function* murmur_hash_fn; |
| codegen_status.MergeStatus(ht_ctx_->CodegenHashRow(codegen, true, &murmur_hash_fn)); |
| |
| // Codegen for evaluating build rows |
| llvm::Function* eval_build_row_fn; |
| codegen_status.MergeStatus(ht_ctx_->CodegenEvalRow(codegen, true, &eval_build_row_fn)); |
| |
| llvm::Function* insert_filters_fn; |
| codegen_status.MergeStatus( |
| CodegenInsertRuntimeFilters(codegen, filter_exprs_, &insert_filters_fn)); |
| |
| if (codegen_status.ok()) { |
| TPrefetchMode::type prefetch_mode = runtime_state_->query_options().prefetch_mode; |
| build_codegen_status = CodegenProcessBuildBatch( |
| codegen, hash_fn, murmur_hash_fn, eval_build_row_fn, insert_filters_fn); |
| insert_codegen_status = CodegenInsertBatch(codegen, hash_fn, murmur_hash_fn, |
| eval_build_row_fn, prefetch_mode); |
| } else { |
| build_codegen_status = codegen_status; |
| insert_codegen_status = codegen_status; |
| } |
| profile()->AddCodegenMsg(build_codegen_status.ok(), build_codegen_status, "Build Side"); |
| profile()->AddCodegenMsg(insert_codegen_status.ok(), insert_codegen_status, |
| "Hash Table Construction"); |
| } |
| |
| string PhjBuilder::DebugString() const { |
| stringstream ss; |
| ss << "Hash partitions: " << hash_partitions_.size() << ":" << endl; |
| for (int i = 0; i < hash_partitions_.size(); ++i) { |
| ss << " Hash partition " << i << " " << hash_partitions_[i]->DebugString() << endl; |
| } |
| if (null_aware_partition_ != nullptr) { |
| ss << "Null-aware partition: " << null_aware_partition_->DebugString(); |
| } |
| return ss.str(); |
| } |
| |
| Status PhjBuilder::CodegenProcessBuildBatch(LlvmCodeGen* codegen, llvm::Function* hash_fn, |
| llvm::Function* murmur_hash_fn, llvm::Function* eval_row_fn, |
| llvm::Function* insert_filters_fn) { |
| llvm::Function* process_build_batch_fn = |
| codegen->GetFunction(IRFunction::PHJ_PROCESS_BUILD_BATCH, true); |
| DCHECK(process_build_batch_fn != NULL); |
| |
| // Replace call sites |
| int replaced = |
| codegen->ReplaceCallSites(process_build_batch_fn, eval_row_fn, "EvalBuildRow"); |
| DCHECK_REPLACE_COUNT(replaced, 1); |
| |
| replaced = codegen->ReplaceCallSites( |
| process_build_batch_fn, insert_filters_fn, "InsertRuntimeFilters"); |
| DCHECK_REPLACE_COUNT(replaced, 1); |
| |
| // Replace some hash table parameters with constants. |
| HashTableCtx::HashTableReplacedConstants replaced_constants; |
| const bool stores_duplicates = true; |
| const int num_build_tuples = row_desc_->tuple_descriptors().size(); |
| RETURN_IF_ERROR(ht_ctx_->ReplaceHashTableConstants(codegen, stores_duplicates, |
| num_build_tuples, process_build_batch_fn, &replaced_constants)); |
| DCHECK_GE(replaced_constants.stores_nulls, 1); |
| DCHECK_EQ(replaced_constants.finds_some_nulls, 0); |
| DCHECK_EQ(replaced_constants.stores_duplicates, 0); |
| DCHECK_EQ(replaced_constants.stores_tuples, 0); |
| DCHECK_EQ(replaced_constants.quadratic_probing, 0); |
| |
| llvm::Value* is_null_aware_arg = codegen->GetArgument(process_build_batch_fn, 5); |
| is_null_aware_arg->replaceAllUsesWith( |
| codegen->GetBoolConstant(join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN)); |
| |
| llvm::Function* process_build_batch_fn_level0 = |
| codegen->CloneFunction(process_build_batch_fn); |
| |
| // Always build runtime filters at level0 (if there are any). |
| // Note that the first argument of this function is the return value. |
| llvm::Value* build_filter_l0_arg = |
| codegen->GetArgument(process_build_batch_fn_level0, 4); |
| build_filter_l0_arg->replaceAllUsesWith( |
| codegen->GetBoolConstant(filter_ctxs_.size() > 0)); |
| |
| // process_build_batch_fn_level0 uses CRC hash if available, |
| replaced = |
| codegen->ReplaceCallSites(process_build_batch_fn_level0, hash_fn, "HashRow"); |
| DCHECK_REPLACE_COUNT(replaced, 1); |
| |
| // process_build_batch_fn uses murmur |
| replaced = |
| codegen->ReplaceCallSites(process_build_batch_fn, murmur_hash_fn, "HashRow"); |
| DCHECK_REPLACE_COUNT(replaced, 1); |
| |
| // Never build filters after repartitioning, as all rows have already been added to the |
| // filters during the level0 build. Note that the first argument of this function is the |
| // return value. |
| llvm::Value* build_filter_arg = codegen->GetArgument(process_build_batch_fn, 4); |
| build_filter_arg->replaceAllUsesWith(codegen->false_value()); |
| |
| // Finalize ProcessBuildBatch functions |
| process_build_batch_fn = codegen->FinalizeFunction(process_build_batch_fn); |
| if (process_build_batch_fn == NULL) { |
| return Status( |
| "Codegen'd PhjBuilder::ProcessBuildBatch() function " |
| "failed verification, see log"); |
| } |
| process_build_batch_fn_level0 = |
| codegen->FinalizeFunction(process_build_batch_fn_level0); |
| if (process_build_batch_fn == NULL) { |
| return Status( |
| "Codegen'd level-zero PhjBuilder::ProcessBuildBatch() " |
| "function failed verification, see log"); |
| } |
| |
| // Register native function pointers |
| codegen->AddFunctionToJit( |
| process_build_batch_fn, reinterpret_cast<void**>(&process_build_batch_fn_)); |
| codegen->AddFunctionToJit(process_build_batch_fn_level0, |
| reinterpret_cast<void**>(&process_build_batch_fn_level0_)); |
| return Status::OK(); |
| } |
| |
| Status PhjBuilder::CodegenInsertBatch(LlvmCodeGen* codegen, llvm::Function* hash_fn, |
| llvm::Function* murmur_hash_fn, llvm::Function* eval_row_fn, |
| TPrefetchMode::type prefetch_mode) { |
| llvm::Function* insert_batch_fn = |
| codegen->GetFunction(IRFunction::PHJ_INSERT_BATCH, true); |
| llvm::Function* build_equals_fn; |
| RETURN_IF_ERROR(ht_ctx_->CodegenEquals(codegen, true, &build_equals_fn)); |
| |
| // Replace the parameter 'prefetch_mode' with constant. |
| llvm::Value* prefetch_mode_arg = codegen->GetArgument(insert_batch_fn, 1); |
| DCHECK_GE(prefetch_mode, TPrefetchMode::NONE); |
| DCHECK_LE(prefetch_mode, TPrefetchMode::HT_BUCKET); |
| prefetch_mode_arg->replaceAllUsesWith(codegen->GetI32Constant(prefetch_mode)); |
| |
| // Use codegen'd EvalBuildRow() function |
| int replaced = codegen->ReplaceCallSites(insert_batch_fn, eval_row_fn, "EvalBuildRow"); |
| DCHECK_REPLACE_COUNT(replaced, 1); |
| |
| // Use codegen'd Equals() function |
| replaced = codegen->ReplaceCallSites(insert_batch_fn, build_equals_fn, "Equals"); |
| DCHECK_REPLACE_COUNT(replaced, 1); |
| |
| // Replace hash-table parameters with constants. |
| HashTableCtx::HashTableReplacedConstants replaced_constants; |
| const bool stores_duplicates = true; |
| const int num_build_tuples = row_desc_->tuple_descriptors().size(); |
| RETURN_IF_ERROR(ht_ctx_->ReplaceHashTableConstants(codegen, stores_duplicates, |
| num_build_tuples, insert_batch_fn, &replaced_constants)); |
| DCHECK_GE(replaced_constants.stores_nulls, 1); |
| DCHECK_EQ(replaced_constants.finds_some_nulls, 0); |
| DCHECK_GE(replaced_constants.stores_duplicates, 1); |
| DCHECK_GE(replaced_constants.stores_tuples, 1); |
| DCHECK_GE(replaced_constants.quadratic_probing, 1); |
| |
| llvm::Function* insert_batch_fn_level0 = codegen->CloneFunction(insert_batch_fn); |
| |
| // Use codegen'd hash functions |
| replaced = codegen->ReplaceCallSites(insert_batch_fn_level0, hash_fn, "HashRow"); |
| DCHECK_REPLACE_COUNT(replaced, 1); |
| replaced = codegen->ReplaceCallSites(insert_batch_fn, murmur_hash_fn, "HashRow"); |
| DCHECK_REPLACE_COUNT(replaced, 1); |
| |
| insert_batch_fn = codegen->FinalizeFunction(insert_batch_fn); |
| if (insert_batch_fn == NULL) { |
| return Status( |
| "PartitionedHashJoinNode::CodegenInsertBatch(): codegen'd " |
| "InsertBatch() function failed verification, see log"); |
| } |
| insert_batch_fn_level0 = codegen->FinalizeFunction(insert_batch_fn_level0); |
| if (insert_batch_fn_level0 == NULL) { |
| return Status( |
| "PartitionedHashJoinNode::CodegenInsertBatch(): codegen'd zero-level " |
| "InsertBatch() function failed verification, see log"); |
| } |
| |
| codegen->AddFunctionToJit(insert_batch_fn, reinterpret_cast<void**>(&insert_batch_fn_)); |
| codegen->AddFunctionToJit( |
| insert_batch_fn_level0, reinterpret_cast<void**>(&insert_batch_fn_level0_)); |
| return Status::OK(); |
| } |
| |
| // An example of the generated code for a query with two filters built by this node. |
| // |
| // ; Function Attrs: noinline |
| // define void @InsertRuntimeFilters(%"class.impala::PhjBuilder"* %this, |
| // %"class.impala::TupleRow"* %row) #46 { |
| // entry: |
| // call void @FilterContextInsert(%"struct.impala::FilterContext"* inttoptr ( |
| // i64 197870464 to %"struct.impala::FilterContext"*), |
| // %"class.impala::TupleRow"* %row) |
| // call void @FilterContextInsert.14(%"struct.impala::FilterContext"* inttoptr ( |
| // i64 197870496 to %"struct.impala::FilterContext"*), |
| // %"class.impala::TupleRow"* %row) |
| // ret void |
| // } |
| Status PhjBuilder::CodegenInsertRuntimeFilters( |
| LlvmCodeGen* codegen, const vector<ScalarExpr*>& filter_exprs, llvm::Function** fn) { |
| llvm::LLVMContext& context = codegen->context(); |
| LlvmBuilder builder(context); |
| |
| *fn = nullptr; |
| llvm::Type* this_type = codegen->GetStructPtrType<PhjBuilder>(); |
| llvm::PointerType* tuple_row_ptr_type = codegen->GetStructPtrType<TupleRow>(); |
| LlvmCodeGen::FnPrototype prototype( |
| codegen, "InsertRuntimeFilters", codegen->void_type()); |
| prototype.AddArgument(LlvmCodeGen::NamedVariable("this", this_type)); |
| prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type)); |
| |
| llvm::Value* args[2]; |
| llvm::Function* insert_runtime_filters_fn = prototype.GeneratePrototype(&builder, args); |
| llvm::Value* row_arg = args[1]; |
| |
| int num_filters = filter_exprs.size(); |
| for (int i = 0; i < num_filters; ++i) { |
| llvm::Function* insert_fn; |
| RETURN_IF_ERROR(FilterContext::CodegenInsert( |
| codegen, filter_exprs_[i], &filter_ctxs_[i], &insert_fn)); |
| llvm::PointerType* filter_context_type = codegen->GetStructPtrType<FilterContext>(); |
| llvm::Value* filter_context_ptr = |
| codegen->CastPtrToLlvmPtr(filter_context_type, &filter_ctxs_[i]); |
| |
| llvm::Value* insert_args[] = {filter_context_ptr, row_arg}; |
| builder.CreateCall(insert_fn, insert_args); |
| } |
| |
| builder.CreateRetVoid(); |
| |
| if (num_filters > 0) { |
| // Don't inline this function to avoid code bloat in ProcessBuildBatch(). |
| // If there is any filter, InsertRuntimeFilters() is large enough to not benefit |
| // much from inlining. |
| insert_runtime_filters_fn->addFnAttr(llvm::Attribute::NoInline); |
| } |
| |
| *fn = codegen->FinalizeFunction(insert_runtime_filters_fn); |
| if (*fn == nullptr) { |
| return Status("Codegen'd PhjBuilder::InsertRuntimeFilters() failed " |
| "verification, see log"); |
| } |
| return Status::OK(); |
| } |