blob: f173c3855c8c9a47b84a4142496f8b6bdd2d71c7 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/topn-node.h"
#include "common/compiler-util.h"
#include "util/debug-util.h"
#include "common/names.h"
using namespace impala;
void TopNNode::InsertBatchUnpartitioned(RuntimeState* state, RowBatch* batch) {
DCHECK(!is_partitioned());
// TODO: after inlining the comparator calls with codegen - IMPALA-4065 - we could
// probably squeeze more performance out of this loop by ensure that as many loads
// are hoisted out of the loop as possible (either via code changes or __restrict__)
// annotations.
FOREACH_ROW(batch, 0, iter) {
int num_to_reclaim = heap_->InsertTupleRow(this, iter.Get());
// Using a branch here instead of adding the value directly improves
// top-n performance by a couple of %.
if (num_to_reclaim != 0) rows_to_reclaim_ += num_to_reclaim;
}
}
int TopNNode::Heap::InsertTupleRow(TopNNode* node, TupleRow* input_row) {
const TupleDescriptor& tuple_desc = *node->output_tuple_desc_;
if (priority_queue_.Size() < heap_capacity()) {
// Add all tuples (including any ties) until we hit capacity.
Tuple* insert_tuple = reinterpret_cast<Tuple*>(
node->tuple_pool_->Allocate(node->tuple_byte_size()));
insert_tuple->MaterializeExprs<false, false>(input_row, tuple_desc,
node->output_tuple_expr_evals_, node->tuple_pool_.get());
priority_queue_.Push(insert_tuple);
return 0;
}
// We're at capacity - compare to the first row in the priority queue to see if
// we need to insert this row into the queue.
DCHECK(!priority_queue_.Empty());
Tuple* top_tuple = priority_queue_.Top();
node->tmp_tuple_->MaterializeExprs<false, true>(input_row, tuple_desc,
node->output_tuple_expr_evals_, nullptr);
if (include_ties()) {
return InsertTupleWithTieHandling(*node->order_cmp_, node, node->tmp_tuple_);
} else {
if (node->order_cmp_->Less(node->tmp_tuple_, top_tuple)) {
// Pop off the old head, and replace with the new tuple. Deep copy into 'top_tuple'
// to reuse the fixed-length memory of 'top_tuple'.
node->tmp_tuple_->DeepCopy(top_tuple, tuple_desc, node->tuple_pool_.get());
// Re-heapify from the top element and down.
priority_queue_.HeapifyFromTop();
return 1;
}
return 0;
}
}
int TopNNode::Heap::InsertTupleWithTieHandling(
const TupleRowComparator& cmp, TopNNode* node, Tuple* materialized_tuple) {
DCHECK(include_ties());
DCHECK_EQ(capacity_, priority_queue_.Size())
<< "Ties only need special handling when heap is at capacity";
const TupleDescriptor& tuple_desc = *node->output_tuple_desc_;
Tuple* top_tuple = priority_queue_.Top();
// If we need to retain ties with the current head, the logic is more complex - we
// have a logical heap in indices [0, heap_capacity()) of 'priority_queue_' plus
// some number of tuples in 'overflowed_ties_' that are equal to priority_queue_.Top()
// according to 'cmp'.
int cmp_result = cmp.Compare(materialized_tuple, top_tuple);
if (cmp_result == 0) {
// This is a duplicate of the current min, we need to include it as a tie with min.
Tuple* insert_tuple =
reinterpret_cast<Tuple*>(node->tuple_pool_->Allocate(node->tuple_byte_size()));
materialized_tuple->DeepCopy(insert_tuple, tuple_desc, node->tuple_pool_.get());
overflowed_ties_.push_back(insert_tuple);
return 0;
} else if (cmp_result > 0) {
// Tuple does not belong in the heap.
return 1;
} else {
// 'materialized_tuple' needs to be added. Figure out which other tuples, if any,
// need to be removed from the heap.
DCHECK_LT(cmp_result, 0);
// Pop off the head.
priority_queue_.Pop();
// Check if 'top_tuple' (the tuple we just popped off) is tied with the new head.
if (heap_capacity() > 1 && cmp.Compare(top_tuple, priority_queue_.Top()) == 0) {
// The new top is still tied with the tuples in 'overflowed_ties_' so we must keep
// it. The previous top becomes another overflowed tuple.
overflowed_ties_.push_back(top_tuple);
Tuple* insert_tuple =
reinterpret_cast<Tuple*>(node->tuple_pool_->Allocate(node->tuple_byte_size()));
materialized_tuple->DeepCopy(insert_tuple, tuple_desc, node->tuple_pool_.get());
// Push the new tuple onto the heap, but retain the tied tuple.
priority_queue_.Push(insert_tuple);
return 0;
} else {
// No tuples tied with 'top_tuple' are left.
// Reuse the fixed-length memory of 'top_tuple' to reduce allocations.
int64_t num_rows_replaced = overflowed_ties_.size() + 1;
overflowed_ties_.clear();
materialized_tuple->DeepCopy(top_tuple, tuple_desc, node->tuple_pool_.get());
priority_queue_.Push(top_tuple);
return num_rows_replaced;
}
}
}
void TopNNode::InsertBatchPartitioned(RuntimeState* state, RowBatch* batch) {
DCHECK(is_partitioned());
// Insert all of the rows in the batch into the per-partition heaps. The soft memory
// limit will be checked later, in case this batch put us over the limit.
FOREACH_ROW(batch, 0, iter) {
tmp_tuple_->MaterializeExprs<false, true>(
iter.Get(), *output_tuple_desc_, output_tuple_expr_evals_, nullptr);
// TODO: IMPALA-10228: the comparator won't get inlined by codegen here.
auto it = partition_heaps_.find(tmp_tuple_);
Heap* new_heap = nullptr;
Heap* heap;
if (it == partition_heaps_.end()) {
// Allocate the heap here, but insert in into partition_heaps_ later once we've
// initialized the tuple that will be the key.
new_heap =
new Heap(*intra_partition_order_cmp_, per_partition_limit(), include_ties());
heap = new_heap;
COUNTER_ADD(in_mem_heap_created_counter_, 1);
} else {
heap = it->second.get();
}
heap->InsertMaterializedTuple(this, tmp_tuple_);
if (new_heap != nullptr) {
DCHECK_GT(new_heap->num_tuples(), 0);
// Add the new heap with the first tuple as the key.
partition_heaps_.emplace(new_heap->top(), unique_ptr<Heap>(new_heap));
}
}
}
void TopNNode::Heap::InsertMaterializedTuple(
TopNNode* node, Tuple* materialized_tuple) {
DCHECK(node->is_partitioned());
const TupleDescriptor& tuple_desc = *node->output_tuple_desc_;
Tuple* insert_tuple = nullptr;
if (priority_queue_.Size() < heap_capacity()) {
// Add all tuples until we hit capacity.
insert_tuple =
reinterpret_cast<Tuple*>(node->tuple_pool_->Allocate(node->tuple_byte_size()));
materialized_tuple->DeepCopy(insert_tuple, tuple_desc, node->tuple_pool_.get());
priority_queue_.Push(insert_tuple);
return;
}
// We're at capacity - compare to the first row in the priority queue to see if
// we need to insert this row into the heap.
DCHECK(!priority_queue_.Empty());
Tuple* top_tuple = priority_queue_.Top();
if (!include_ties()) {
++num_tuples_discarded_; // One of the tuples will be discarded.
int cmp_result =
node->intra_partition_order_cmp_->Compare(materialized_tuple, top_tuple);
if (cmp_result >= 0) return;
// Pop off the old head, and replace with the new tuple. Reuse the fixed-length
// memory of 'top_tuple' to reduce allocations.
materialized_tuple->DeepCopy(top_tuple, tuple_desc, node->tuple_pool_.get());
priority_queue_.HeapifyFromTop();
return;
}
num_tuples_discarded_ += InsertTupleWithTieHandling(
*node->intra_partition_order_cmp_, node, materialized_tuple);
}