blob: 4da6bdd6eba0820ff96d97114648ce45fab943e2 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/sorter-internal.h"
#include <boost/random/uniform_int.hpp>
#include "runtime/exec-env.h"
#include "runtime/runtime-state.h"
#include "util/runtime-profile-counters.h"
namespace impala {
void Sorter::TupleIterator::NextPage(Sorter::Run* run) {
// When moving after the last tuple, stay at the last page.
if (index_ >= run->num_tuples()) return;
++page_index_;
DCHECK_LT(page_index_, run->fixed_len_pages_.size());
buffer_start_index_ = page_index_ * run->page_capacity_;
DCHECK_EQ(index_, buffer_start_index_);
buffer_end_index_ = buffer_start_index_ + run->page_capacity_;
tuple_ = run->fixed_len_pages_[page_index_].data();
}
void Sorter::TupleIterator::PrevPage(Sorter::Run* run) {
// When moving before the first tuple, stay at the first page.
if (index_ < 0) return;
--page_index_;
DCHECK_GE(page_index_, 0);
buffer_start_index_ = page_index_ * run->page_capacity_;
buffer_end_index_ = buffer_start_index_ + run->page_capacity_;
DCHECK_EQ(index_, buffer_end_index_ - 1);
int last_tuple_page_offset = run->sort_tuple_size_ * (run->page_capacity_ - 1);
tuple_ = run->fixed_len_pages_[page_index_].data() + last_tuple_page_offset;
}
void Sorter::TupleIterator::Next(Sorter::Run* run, int tuple_size) {
DCHECK_LT(index_, run->num_tuples()) << "Can only advance one past end of run";
tuple_ += tuple_size;
++index_;
if (UNLIKELY(index_ >= buffer_end_index_)) NextPage(run);
}
void Sorter::TupleIterator::Prev(Sorter::Run* run, int tuple_size) {
DCHECK_GE(index_, 0) << "Can only advance one before start of run";
tuple_ -= tuple_size;
--index_;
if (UNLIKELY(index_ < buffer_start_index_)) PrevPage(run);
}
bool Sorter::TupleSorter::Less(const TupleRow* lhs, const TupleRow* rhs) {
--num_comparisons_till_free_;
DCHECK_GE(num_comparisons_till_free_, 0);
if (UNLIKELY(num_comparisons_till_free_ == 0)) {
parent_->expr_results_pool_.Clear();
num_comparisons_till_free_ = state_->batch_size();
}
return comparator_.Less(lhs, rhs);
}
Status Sorter::TupleSorter::Partition(TupleIterator begin,
TupleIterator end, const Tuple* pivot, TupleIterator* cut) {
// Hoist member variable lookups out of loop to avoid extra loads inside loop.
Run* run = run_;
int tuple_size = tuple_size_;
Tuple* temp_tuple = reinterpret_cast<Tuple*>(temp_tuple_buffer_);
Tuple* swap_tuple = reinterpret_cast<Tuple*>(swap_buffer_);
// Copy pivot into temp_tuple since it points to a tuple within [begin, end).
DCHECK(temp_tuple != nullptr);
DCHECK(pivot != nullptr);
memcpy(temp_tuple, pivot, tuple_size);
TupleIterator left = begin;
TupleIterator right = end;
right.Prev(run, tuple_size); // Set 'right' to the last tuple in range.
while (true) {
// Search for the first and last out-of-place elements, and swap them.
while (Less(left.row(), reinterpret_cast<TupleRow*>(&temp_tuple))) {
left.Next(run, tuple_size);
}
while (Less(reinterpret_cast<TupleRow*>(&temp_tuple), right.row())) {
right.Prev(run, tuple_size);
}
if (left.index() >= right.index()) break;
// Swap first and last tuples.
Swap(left.tuple(), right.tuple(), swap_tuple, tuple_size);
left.Next(run, tuple_size);
right.Prev(run, tuple_size);
RETURN_IF_CANCELLED(state_);
RETURN_IF_ERROR(state_->GetQueryStatus());
}
*cut = left;
return Status::OK();
}
// Sort the sequence of tuples from [begin, last).
// Begin with a sorted sequence of size 1 [begin, begin+1).
// During each pass of the outermost loop, add the next tuple (at position 'i') to
// the sorted sequence by comparing it to each element of the sorted sequence
// (reverse order) to find its correct place in the sorted sequence, copying tuples
// along the way.
Status Sorter::TupleSorter::InsertionSort(const TupleIterator& begin,
const TupleIterator& end) {
DCHECK_LT(begin.index(), end.index());
// Hoist member variable lookups out of loop to avoid extra loads inside loop.
Run* run = run_;
int tuple_size = tuple_size_;
uint8_t* temp_tuple_buffer = temp_tuple_buffer_;
TupleIterator insert_iter = begin;
insert_iter.Next(run, tuple_size);
for (; insert_iter.index() < end.index(); insert_iter.Next(run, tuple_size)) {
// insert_iter points to the tuple after the currently sorted sequence that must
// be inserted into the sorted sequence. Copy to temp_tuple_buffer_ since it may be
// overwritten by the one at position 'insert_iter - 1'
memcpy(temp_tuple_buffer, insert_iter.tuple(), tuple_size);
// 'iter' points to the tuple that temp_tuple_buffer will be compared to.
// 'copy_to' is the where iter should be copied to if it is >= temp_tuple_buffer.
// copy_to always to the next row after 'iter'
TupleIterator iter = insert_iter;
iter.Prev(run, tuple_size);
Tuple* copy_to = insert_iter.tuple();
while (Less(reinterpret_cast<TupleRow*>(&temp_tuple_buffer), iter.row())) {
memcpy(copy_to, iter.tuple(), tuple_size);
copy_to = iter.tuple();
// Break if 'iter' has reached the first row, meaning that the temp row
// will be inserted in position 'begin'
if (iter.index() <= begin.index()) break;
iter.Prev(run, tuple_size);
}
memcpy(copy_to, temp_tuple_buffer, tuple_size);
}
RETURN_IF_CANCELLED(state_);
RETURN_IF_ERROR(state_->GetQueryStatus());
return Status::OK();
}
Status Sorter::TupleSorter::SortHelper(TupleIterator begin, TupleIterator end) {
// Use insertion sort for smaller sequences.
while (end.index() - begin.index() > INSERTION_THRESHOLD) {
// Select a pivot and call Partition() to split the tuples in [begin, end) into two
// groups (<= pivot and >= pivot) in-place. 'cut' is the index of the first tuple in
// the second group.
Tuple* pivot = SelectPivot(begin, end);
TupleIterator cut;
RETURN_IF_ERROR(Partition(begin, end, pivot, &cut));
// Recurse on the smaller partition. This limits stack size to log(n) stack frames.
if (cut.index() - begin.index() < end.index() - cut.index()) {
// Left partition is smaller.
RETURN_IF_ERROR(SortHelper(begin, cut));
begin = cut;
} else {
// Right partition is equal or smaller.
RETURN_IF_ERROR(SortHelper(cut, end));
end = cut;
}
}
if (begin.index() < end.index()) RETURN_IF_ERROR(InsertionSort(begin, end));
return Status::OK();
}
Tuple* Sorter::TupleSorter::SelectPivot(TupleIterator begin, TupleIterator end) {
// Select the median of three random tuples. The random selection avoids pathological
// behaviour associated with techniques that pick a fixed element (e.g. picking
// first/last/middle element) and taking the median tends to help us select better
// pivots that more evenly split the input range. This method makes selection of
// bad pivots very infrequent.
//
// To illustrate, if we define a bad pivot as one in the lower or upper 10% of values,
// then the median of three is a bad pivot only if all three randomly-selected values
// are in the lower or upper 10%. The probability of that is 0.2 * 0.2 * 0.2 = 0.008:
// less than 1%. Since selection is random each time, the chance of repeatedly picking
// bad pivots decreases exponentialy and becomes negligibly small after a few
// iterations.
Tuple* tuples[3];
for (auto& tuple : tuples) {
int64_t index = boost::uniform_int<int64_t>(begin.index(), end.index() - 1)(rng_);
TupleIterator iter(run_, index);
DCHECK(iter.tuple() != nullptr);
tuple = iter.tuple();
}
return MedianOfThree(tuples[0], tuples[1], tuples[2]);
}
Tuple* Sorter::TupleSorter::MedianOfThree(Tuple* t1, Tuple* t2, Tuple* t3) {
TupleRow* tr1 = reinterpret_cast<TupleRow*>(&t1);
TupleRow* tr2 = reinterpret_cast<TupleRow*>(&t2);
TupleRow* tr3 = reinterpret_cast<TupleRow*>(&t3);
bool t1_lt_t2 = Less(tr1, tr2);
bool t2_lt_t3 = Less(tr2, tr3);
bool t1_lt_t3 = Less(tr1, tr3);
if (t1_lt_t2) {
// t1 < t2
if (t2_lt_t3) {
// t1 < t2 < t3
return t2;
} else if (t1_lt_t3) {
// t1 < t3 <= t2
return t3;
} else {
// t3 <= t1 < t2
return t1;
}
} else {
// t2 <= t1
if (t1_lt_t3) {
// t2 <= t1 < t3
return t1;
} else if (t2_lt_t3) {
// t2 < t3 <= t1
return t3;
} else {
// t3 <= t2 <= t1
return t2;
}
}
}
void Sorter::TupleSorter::Swap(Tuple* left, Tuple* right, Tuple* swap_tuple,
int tuple_size) {
memcpy(swap_tuple, left, tuple_size);
memcpy(left, right, tuple_size);
memcpy(right, swap_tuple, tuple_size);
}
}