blob: 44398fb5ab10acfcd8634b1c4e592e241dacb86e [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/sorter-internal.h"
#include <boost/random/uniform_int.hpp>
#include "runtime/exec-env.h"
#include "runtime/runtime-state.h"
#include "util/runtime-profile-counters.h"
namespace impala {
void IR_ALWAYS_INLINE Sorter::TupleIterator::NextPage(Sorter::Run* run) {
// When moving after the last tuple, stay at the last page.
if (index_ >= run->num_tuples()) return;
++page_index_;
DCHECK_LT(page_index_, run->fixed_len_pages_.size());
buffer_start_index_ = page_index_ * run->page_capacity_;
DCHECK_EQ(index_, buffer_start_index_);
buffer_end_index_ = buffer_start_index_ + run->page_capacity_;
tuple_ = run->fixed_len_pages_[page_index_].data();
}
void IR_ALWAYS_INLINE Sorter::TupleIterator::PrevPage(Sorter::Run* run) {
// When moving before the first tuple, stay at the first page.
if (index_ < 0) return;
--page_index_;
DCHECK_GE(page_index_, 0);
buffer_start_index_ = page_index_ * run->page_capacity_;
buffer_end_index_ = buffer_start_index_ + run->page_capacity_;
DCHECK_EQ(index_, buffer_end_index_ - 1);
int last_tuple_page_offset = run->sort_tuple_size_ * (run->page_capacity_ - 1);
tuple_ = run->fixed_len_pages_[page_index_].data() + last_tuple_page_offset;
}
// IMPALA-3816: Function is not inlined into Partition() without IR_ALWAYS_INLINE hint.
void IR_ALWAYS_INLINE Sorter::TupleIterator::Next(Sorter::Run* run, int tuple_size) {
DCHECK_LT(index_, run->num_tuples()) << "Can only advance one past end of run";
tuple_ += tuple_size;
++index_;
if (UNLIKELY(index_ >= buffer_end_index_)) NextPage(run);
}
// IMPALA-3816: Function is not inlined into Partition() without IR_ALWAYS_INLINE hint.
void IR_ALWAYS_INLINE Sorter::TupleIterator::Prev(Sorter::Run* run, int tuple_size) {
DCHECK_GE(index_, 0) << "Can only advance one before start of run";
tuple_ -= tuple_size;
--index_;
if (UNLIKELY(index_ < buffer_start_index_)) PrevPage(run);
}
void IR_ALWAYS_INLINE Sorter::TupleSorter::FreeExprResultPoolIfNeeded() {
--num_comparisons_till_free_;
DCHECK_GE(num_comparisons_till_free_, 0);
if (UNLIKELY(num_comparisons_till_free_ == 0)) {
parent_->expr_results_pool_.Clear();
num_comparisons_till_free_ = state_->batch_size();
}
}
// IMPALA-3816: Function is not inlined into Partition() without IR_ALWAYS_INLINE hint.
bool IR_ALWAYS_INLINE Sorter::TupleSorter::Less(
const TupleRow* lhs, const TupleRow* rhs) {
FreeExprResultPoolIfNeeded();
return comparator_.Less(lhs, rhs);
}
int IR_ALWAYS_INLINE Sorter::TupleSorter::Compare(
const TupleRow* lhs, const TupleRow* rhs) {
FreeExprResultPoolIfNeeded();
return comparator_.Compare(lhs, rhs);
}
Status IR_ALWAYS_INLINE Sorter::TupleSorter::Partition2way(TupleIterator begin,
TupleIterator end, const Tuple* pivot, TupleIterator* cut) {
// Hoist member variable lookups out of loop to avoid extra loads inside loop.
Run* run = run_;
const int tuple_size = get_tuple_size();
Tuple* pivot_tuple = reinterpret_cast<Tuple*>(temp_tuple_buffer_);
TupleRow* pivot_tuple_row = reinterpret_cast<TupleRow*>(&pivot_tuple);
Tuple* swap_tuple = reinterpret_cast<Tuple*>(swap_buffer_);
// Copy pivot into temp_tuple since it points to a tuple within [begin, end).
DCHECK(pivot_tuple != nullptr);
DCHECK(pivot != nullptr);
memcpy(pivot_tuple, pivot, tuple_size);
TupleIterator left = begin;
TupleIterator right = end;
right.Prev(run, tuple_size); // Set 'right' to the last tuple in range.
while (true) {
// Search for the first and last out-of-place elements, and swap them.
while (Less(left.row(), pivot_tuple_row)) {
left.Next(run, tuple_size);
}
while (Less(pivot_tuple_row, right.row())) {
right.Prev(run, tuple_size);
}
if (left.index() >= right.index()) break;
// Swap first and last tuples.
Swap(left.tuple(), right.tuple(), swap_tuple, tuple_size);
left.Next(run, tuple_size);
right.Prev(run, tuple_size);
RETURN_IF_CANCELLED(state_);
RETURN_IF_ERROR(state_->GetQueryStatus());
}
*cut = left;
return Status::OK();
}
// Partitions the tuples into 3 groups: < pivot, = pivot and > pivot.
// Similar to 2-way quicksort, but elements that are = pivot are put to the
// sides while partitioning < pivot and > pivot elements.
// When all non-equal elements are partitioned, the = pivot elements are
// moved to their place in the middle between the two groups of < pivot and
// > pivot elements.
Status IR_ALWAYS_INLINE Sorter::TupleSorter::Partition3way(TupleIterator begin,
TupleIterator end, const Tuple* pivot, TupleIterator* cut_left,
TupleIterator* cut_right) {
// Hoist member variable lookups out of loop to avoid extra loads inside loop.
Run* run = run_;
const int tuple_size = get_tuple_size();
Tuple* pivot_tuple = reinterpret_cast<Tuple*>(temp_tuple_buffer_);
TupleRow* pivot_tuple_row = reinterpret_cast<TupleRow*>(&pivot_tuple);
Tuple* swap_tuple = reinterpret_cast<Tuple*>(swap_buffer_);
// Copy pivot into temp_tuple since it points to a tuple within [begin, end).
DCHECK(pivot_tuple != nullptr);
DCHECK(pivot != nullptr);
memcpy(pivot_tuple, pivot, tuple_size);
TupleIterator left = begin;
TupleIterator right = end;
right.Prev(run, tuple_size); // Set 'right' to the last tuple in range.
// First tuple that is < pivot after last tuple that is = pivot on the left
TupleIterator equals_left = begin;
// First tuple that is = pivot after last tuple that is > pivot on the right
TupleIterator equals_right = end;
while (true) {
// Search for elements that are = pivot, and move them to the sides.
// Stops at the first and last out-of-place elements, so they
// will be swapped.
while (left.index() <= right.index()) {
int cmp = Compare(left.row(), pivot_tuple_row);
if (cmp > 0) {
break;
} else if (cmp == 0) {
Swap(equals_left.tuple(), left.tuple(), swap_tuple, tuple_size);
equals_left.Next(run, tuple_size);
}
left.Next(run, tuple_size);
}
while (left.index() <= right.index()) {
int cmp = Compare(pivot_tuple_row, right.row());
if (cmp > 0) {
break;
} else if (cmp == 0) {
equals_right.Prev(run, tuple_size);
Swap(equals_right.tuple(), right.tuple(), swap_tuple, tuple_size);
}
right.Prev(run, tuple_size);
}
if (left.index() >= right.index()) break;
// Swap first and last out-of-place tuples.
Swap(left.tuple(), right.tuple(), swap_tuple, tuple_size);
left.Next(run, tuple_size);
right.Prev(run, tuple_size);
RETURN_IF_CANCELLED(state_);
RETURN_IF_ERROR(state_->GetQueryStatus());
}
if(left.index() == right.index()) {
DCHECK(Compare(pivot_tuple_row, right.row()) == 0);
}
// Move equal columns from the sides to the pivot
while(equals_left.index() > begin.index()) {
equals_left.Prev(run, tuple_size);
left.Prev(run, tuple_size);
Swap(equals_left.tuple(), left.tuple(), swap_tuple, tuple_size);
}
while (equals_right.index() < end.index()) {
right.Next(run, tuple_size);
Swap(equals_right.tuple(), right.tuple(), swap_tuple, tuple_size);
equals_right.Next(run, tuple_size);
}
*cut_left = left;
right.Next(run, tuple_size);
*cut_right = right;
return Status::OK();
}
// Sort the sequence of tuples from [begin, last).
// Begin with a sorted sequence of size 1 [begin, begin+1).
// During each pass of the outermost loop, add the next tuple (at position 'i') to
// the sorted sequence by comparing it to each element of the sorted sequence
// (reverse order) to find its correct place in the sorted sequence, copying tuples
// along the way.
Status IR_ALWAYS_INLINE Sorter::TupleSorter::InsertionSort(const TupleIterator& begin,
const TupleIterator& end) {
DCHECK_LT(begin.index(), end.index());
// Hoist member variable lookups out of loop to avoid extra loads inside loop.
Run* run = run_;
const int tuple_size = get_tuple_size();
uint8_t* temp_tuple_buffer = temp_tuple_buffer_;
TupleIterator insert_iter = begin;
insert_iter.Next(run, tuple_size);
for (; insert_iter.index() < end.index(); insert_iter.Next(run, tuple_size)) {
// insert_iter points to the tuple after the currently sorted sequence that must
// be inserted into the sorted sequence. Copy to temp_tuple_buffer_ since it may be
// overwritten by the one at position 'insert_iter - 1'
memcpy(temp_tuple_buffer, insert_iter.tuple(), tuple_size);
// 'iter' points to the tuple that temp_tuple_buffer will be compared to.
// 'copy_to' is the where iter should be copied to if it is >= temp_tuple_buffer.
// copy_to always to the next row after 'iter'
TupleIterator iter = insert_iter;
iter.Prev(run, tuple_size);
Tuple* copy_to = insert_iter.tuple();
while (Less(reinterpret_cast<TupleRow*>(&temp_tuple_buffer), iter.row())) {
memcpy(copy_to, iter.tuple(), tuple_size);
copy_to = iter.tuple();
// Break if 'iter' has reached the first row, meaning that the temp row
// will be inserted in position 'begin'
if (iter.index() <= begin.index()) break;
iter.Prev(run, tuple_size);
}
memcpy(copy_to, temp_tuple_buffer, tuple_size);
}
RETURN_IF_CANCELLED(state_);
RETURN_IF_ERROR(state_->GetQueryStatus());
return Status::OK();
}
Status Sorter::TupleSorter::SortHelper(TupleIterator begin, TupleIterator end) {
// Use insertion sort for smaller sequences.
while (end.index() - begin.index() > INSERTION_THRESHOLD) {
// Select a pivot. If duplicates are found during pivot selection,
// has_equals == true, call Partition3way() to split the tuples in
// [begin, end) into three groups (< pivot, = pivot and > pivot) in-place.
// 'cut_left' is the index of the first tuple in the second group,
// 'cut_right' is the first tuple in the third group.
// If there are lots of equal values, 3-way quicksort is much faster
// than 2-way, otherwise it is slightly slower.
bool has_equals = false;
Tuple* pivot = SelectPivot(begin, end, &has_equals);
TupleIterator cut_left;
TupleIterator cut_right;
if (has_equals) {
RETURN_IF_ERROR(Partition3way(begin, end, pivot, &cut_left, &cut_right));
} else {
// If no duplicates are found, call Partition2way() to split the tuples
// in [begin, end) into two groups (<= pivot and >= pivot) in-place.
// 'cut_left' is the index of the first tuple in the second group.
RETURN_IF_ERROR(Partition2way(begin, end, pivot, &cut_left));
cut_right = cut_left;
}
// Recurse on the smaller partition. This limits stack size to log(n) stack frames.
if (cut_left.index() - begin.index() < end.index() - cut_right.index()) {
// Left partition is smaller.
RETURN_IF_ERROR(SortHelper(begin, cut_left));
begin = cut_right;
} else {
// Right partition is equal or smaller.
RETURN_IF_ERROR(SortHelper(cut_right, end));
end = cut_left;
}
}
if (begin.index() < end.index()) RETURN_IF_ERROR(InsertionSort(begin, end));
return Status::OK();
}
Tuple* IR_ALWAYS_INLINE Sorter::TupleSorter::SelectPivot(
TupleIterator begin, TupleIterator end, bool* has_equals) {
// Select the median of three random tuples. The random selection avoids pathological
// behaviour associated with techniques that pick a fixed element (e.g. picking
// first/last/middle element) and taking the median tends to help us select better
// pivots that more evenly split the input range. This method makes selection of
// bad pivots very infrequent.
//
// To illustrate, if we define a bad pivot as one in the lower or upper 10% of values,
// then the median of three is a bad pivot only if all three randomly-selected values
// are in the lower or upper 10%. The probability of that is 0.2 * 0.2 * 0.2 = 0.008:
// less than 1%. Since selection is random each time, the chance of repeatedly picking
// bad pivots decreases exponentialy and becomes negligibly small after a few
// iterations.
Tuple* tuples[3];
for (auto& tuple : tuples) {
int64_t index = boost::uniform_int<int64_t>(begin.index(), end.index() - 1)(rng_);
TupleIterator iter(run_, index);
DCHECK(iter.tuple() != nullptr);
tuple = iter.tuple();
}
return MedianOfThree(tuples[0], tuples[1], tuples[2], has_equals);
}
Tuple* IR_ALWAYS_INLINE Sorter::TupleSorter::MedianOfThree(
Tuple* t1, Tuple* t2, Tuple* t3, bool* has_equals) {
TupleRow* tr1 = reinterpret_cast<TupleRow*>(&t1);
TupleRow* tr2 = reinterpret_cast<TupleRow*>(&t2);
TupleRow* tr3 = reinterpret_cast<TupleRow*>(&t3);
int t1_cmp_t2 = Compare(tr1, tr2);
int t2_cmp_t3 = Compare(tr2, tr3);
int t1_cmp_t3 = Compare(tr1, tr3);
Tuple* pivot;
if (t1_cmp_t2 < 0) {
// t1 < t2
if (t2_cmp_t3 < 0) {
// t1 < t2 < t3
pivot = t2;
} else if (t1_cmp_t3 < 1) {
// t1 < t3 <= t2
pivot = t3;
} else {
// t3 <= t1 < t2
pivot = t1;
}
} else {
// t2 <= t1
if (t1_cmp_t3 < 0) {
// t2 <= t1 < t3
pivot = t1;
} else if (t2_cmp_t3 < 1) {
// t2 < t3 <= t1
pivot = t3;
} else {
// t3 <= t2 <= t1
pivot = t2;
}
}
if (t1_cmp_t2*t1_cmp_t3*t2_cmp_t3 == 0){
*has_equals = true;
} else {
*has_equals = false;
}
return pivot;
}
// IMPALA-3816: Function is not inlined into Partition() without IR_ALWAYS_INLINE hint.
void IR_ALWAYS_INLINE Sorter::TupleSorter::Swap(Tuple* RESTRICT left,
Tuple* RESTRICT right, Tuple* RESTRICT swap_tuple, int tuple_size) {
memcpy(swap_tuple, left, tuple_size);
memcpy(left, right, tuple_size);
memcpy(right, swap_tuple, tuple_size);
}
}