blob: 7b4e3c2faafd25d24d9520793ab90199daf900fe [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/sorter-internal.h"
#include <boost/random/mersenne_twister.hpp>
#include <boost/random/uniform_int.hpp>
#include <gutil/strings/substitute.h>
#include "runtime/bufferpool/reservation-tracker.h"
#include "runtime/exec-env.h"
#include "runtime/mem-tracker.h"
#include "runtime/query-state.h"
#include "runtime/runtime-state.h"
#include "runtime/sorted-run-merger.h"
#include "util/pretty-printer.h"
#include "util/ubsan.h"
#include "common/names.h"
using boost::uniform_int;
using boost::mt19937_64;
using namespace strings;
namespace impala {
// Number of pinned pages required for a merge with fixed-length data only.
const int MIN_BUFFERS_PER_MERGE = 3;
Status Sorter::Page::Init(Sorter* sorter) {
const BufferPool::BufferHandle* page_buffer;
RETURN_IF_ERROR(pool()->CreatePage(sorter->buffer_pool_client_, sorter->page_len_,
&handle_, &page_buffer));
data_ = page_buffer->data();
return Status::OK();
}
BufferPool::BufferHandle Sorter::Page::ExtractBuffer(BufferPool::ClientHandle* client) {
DCHECK(data_ != nullptr) << "Page must be in memory";
BufferPool::BufferHandle buffer;
Status status = pool()->ExtractBuffer(client, &handle_, &buffer);
DCHECK(status.ok()) << "Page was in memory, ExtractBuffer() shouldn't fail";
Reset();
return buffer;
}
uint8_t* Sorter::Page::AllocateBytes(int64_t len) {
DCHECK_GE(len, 0);
DCHECK_LE(len, BytesRemaining());
DCHECK(data_ != nullptr);
uint8_t* result = data_ + valid_data_len_;
valid_data_len_ += len;
return result;
}
void Sorter::Page::FreeBytes(int64_t len) {
DCHECK_GE(len, 0);
DCHECK_LE(len, valid_data_len_);
DCHECK(data_ != nullptr);
valid_data_len_ -= len;
}
Status Sorter::Page::WaitForBuffer() {
DCHECK(handle_.is_pinned());
if (data_ != nullptr) return Status::OK();
const BufferPool::BufferHandle* page_buffer;
RETURN_IF_ERROR(handle_.GetBuffer(&page_buffer));
data_ = page_buffer->data();
return Status::OK();
}
Status Sorter::Page::Pin(BufferPool::ClientHandle* client) {
DCHECK(!handle_.is_pinned());
return pool()->Pin(client, &handle_);
}
void Sorter::Page::Unpin(BufferPool::ClientHandle* client) {
pool()->Unpin(client, &handle_);
data_ = nullptr;
}
void Sorter::Page::Close(BufferPool::ClientHandle* client) {
pool()->DestroyPage(client, &handle_);
Reset();
}
void Sorter::Page::Reset() {
DCHECK(!handle_.is_open());
valid_data_len_ = 0;
data_ = nullptr;
}
BufferPool* Sorter::Page::pool() {
return ExecEnv::GetInstance()->buffer_pool();
}
// Sorter::Run methods
Sorter::Run::Run(Sorter* parent, TupleDescriptor* sort_tuple_desc, bool initial_run)
: sorter_(parent),
sort_tuple_desc_(sort_tuple_desc),
sort_tuple_size_(sort_tuple_desc->byte_size()),
page_capacity_(parent->page_len_ / sort_tuple_size_),
has_var_len_slots_(sort_tuple_desc->HasVarlenSlots()),
initial_run_(initial_run),
is_pinned_(initial_run),
is_finalized_(false),
is_sorted_(!initial_run),
num_tuples_(0) {}
Status Sorter::Run::Init() {
int num_to_create = 1 + has_var_len_slots_
+ (has_var_len_slots_ && initial_run_ && sorter_->enable_spilling_);
int64_t required_mem = num_to_create * sorter_->page_len_;
if (!sorter_->buffer_pool_client_->IncreaseReservationToFit(required_mem)) {
return Status(Substitute(
"Unexpected error trying to reserve $0 bytes for a sorted run: $1",
required_mem, sorter_->buffer_pool_client_->DebugString()));
}
RETURN_IF_ERROR(AddPage(&fixed_len_pages_));
if (has_var_len_slots_) {
RETURN_IF_ERROR(AddPage(&var_len_pages_));
if (initial_run_ && sorter_->enable_spilling_) {
// Need additional var len page to reorder var len data in UnpinAllPages().
RETURN_IF_ERROR(var_len_copy_page_.Init(sorter_));
}
}
if (initial_run_) {
sorter_->initial_runs_counter_->Add(1);
} else {
sorter_->spilled_runs_counter_->Add(1);
}
return Status::OK();
}
template <bool HAS_VAR_LEN_SLOTS, bool INITIAL_RUN>
Status Sorter::Run::AddBatchInternal(
RowBatch* batch, int start_index, int* num_processed) {
DCHECK(!is_finalized_);
DCHECK(!fixed_len_pages_.empty());
DCHECK_EQ(HAS_VAR_LEN_SLOTS, has_var_len_slots_);
DCHECK_EQ(INITIAL_RUN, initial_run_);
*num_processed = 0;
Page* cur_fixed_len_page = &fixed_len_pages_.back();
if (!INITIAL_RUN) {
// For intermediate merges, the input row is the sort tuple.
DCHECK_EQ(batch->row_desc()->tuple_descriptors().size(), 1);
DCHECK_EQ(batch->row_desc()->tuple_descriptors()[0], sort_tuple_desc_);
}
/// Keep initial unsorted runs pinned in memory so we can sort them.
const AddPageMode add_mode = INITIAL_RUN ? KEEP_PREV_PINNED : UNPIN_PREV;
// Input rows are copied/materialized into tuples allocated in fixed_len_pages_.
// The variable length column data are copied into pages stored in var_len_pages_.
// Input row processing is split into two loops.
// The inner loop processes as many input rows as will fit in cur_fixed_len_page.
// The outer loop allocates a new page for fixed-len data if the input batch is
// not exhausted.
// cur_input_index is the index into the input 'batch' of the current input row being
// processed.
int cur_input_index = start_index;
vector<StringValue*> string_values;
string_values.reserve(sort_tuple_desc_->string_slots().size());
while (cur_input_index < batch->num_rows()) {
// tuples_remaining is the number of tuples to copy/materialize into
// cur_fixed_len_page.
int tuples_remaining = cur_fixed_len_page->BytesRemaining() / sort_tuple_size_;
tuples_remaining = min(batch->num_rows() - cur_input_index, tuples_remaining);
for (int i = 0; i < tuples_remaining; ++i) {
int total_var_len = 0;
TupleRow* input_row = batch->GetRow(cur_input_index);
Tuple* new_tuple =
reinterpret_cast<Tuple*>(cur_fixed_len_page->AllocateBytes(sort_tuple_size_));
if (INITIAL_RUN) {
new_tuple->MaterializeExprs<HAS_VAR_LEN_SLOTS, true>(input_row,
*sort_tuple_desc_, sorter_->sort_tuple_expr_evals_, nullptr,
&string_values, &total_var_len);
if (total_var_len > sorter_->page_len_) {
int64_t max_row_size = sorter_->state_->query_options().max_row_size;
return Status(TErrorCode::MAX_ROW_SIZE,
PrettyPrinter::Print(total_var_len, TUnit::BYTES), sorter_->node_label_,
PrettyPrinter::Print(max_row_size, TUnit::BYTES));
}
} else {
memcpy(new_tuple, input_row->GetTuple(0), sort_tuple_size_);
if (HAS_VAR_LEN_SLOTS) {
CollectNonNullVarSlots(new_tuple, &string_values, &total_var_len);
}
}
if (HAS_VAR_LEN_SLOTS) {
DCHECK_GT(var_len_pages_.size(), 0);
Page* cur_var_len_page = &var_len_pages_.back();
if (cur_var_len_page->BytesRemaining() < total_var_len) {
bool added;
RETURN_IF_ERROR(TryAddPage(add_mode, &var_len_pages_, &added));
if (added) {
cur_var_len_page = &var_len_pages_.back();
} else {
// There was not enough space in the last var-len page for this tuple, and
// the run could not be extended. Return the fixed-len allocation and exit.
cur_fixed_len_page->FreeBytes(sort_tuple_size_);
return Status::OK();
}
}
// Sorting of tuples containing array values is not implemented. The planner
// combined with projection should guarantee that none are in each tuple.
for (const SlotDescriptor* coll_slot: sort_tuple_desc_->collection_slots()) {
DCHECK(new_tuple->IsNull(coll_slot->null_indicator_offset()));
}
uint8_t* var_data_ptr = cur_var_len_page->AllocateBytes(total_var_len);
if (INITIAL_RUN) {
CopyVarLenData(string_values, var_data_ptr);
} else {
DCHECK_EQ(&var_len_pages_.back(), cur_var_len_page);
CopyVarLenDataConvertOffset(string_values, var_len_pages_.size() - 1,
cur_var_len_page->data(), var_data_ptr);
}
}
++num_tuples_;
++*num_processed;
++cur_input_index;
}
// If there are still rows left to process, get a new page for the fixed-length
// tuples. If the run is already too long, return.
if (cur_input_index < batch->num_rows()) {
bool added;
RETURN_IF_ERROR(TryAddPage(add_mode, &fixed_len_pages_, &added));
if (!added) return Status::OK();
cur_fixed_len_page = &fixed_len_pages_.back();
}
}
return Status::OK();
}
Status Sorter::Run::FinalizeInput() {
DCHECK(!is_finalized_);
RETURN_IF_ERROR(FinalizePages(&fixed_len_pages_));
if (has_var_len_slots_) {
RETURN_IF_ERROR(FinalizePages(&var_len_pages_));
}
is_finalized_ = true;
return Status::OK();
}
Status Sorter::Run::FinalizePages(vector<Page>* pages) {
DCHECK_GT(pages->size(), 0);
Page* last_page = &pages->back();
if (last_page->valid_data_len() > 0) {
DCHECK_EQ(initial_run_, is_pinned_);
if (!is_pinned_) {
// Unpin the last page of this unpinned run. We've finished writing the run so
// all pages in the run can now be unpinned.
last_page->Unpin(sorter_->buffer_pool_client_);
}
} else {
last_page->Close(sorter_->buffer_pool_client_);
pages->pop_back();
}
return Status::OK();
}
void Sorter::Run::CloseAllPages() {
DeleteAndClearPages(&fixed_len_pages_);
DeleteAndClearPages(&var_len_pages_);
if (var_len_copy_page_.is_open()) {
var_len_copy_page_.Close(sorter_->buffer_pool_client_);
}
}
Status Sorter::Run::UnpinAllPages() {
DCHECK(is_sorted_);
DCHECK(initial_run_);
DCHECK(is_pinned_);
DCHECK(is_finalized_);
// A list of var len pages to replace 'var_len_pages_'. Note that after we are done
// we may have a different number of pages, because internal fragmentation may leave
// slightly different amounts of wasted space at the end of each page.
// We need to be careful to clean up these pages if we run into an error in this method.
vector<Page> sorted_var_len_pages;
sorted_var_len_pages.reserve(var_len_pages_.size());
vector<StringValue*> string_values;
int total_var_len;
string_values.reserve(sort_tuple_desc_->string_slots().size());
Page* cur_sorted_var_len_page = nullptr;
if (HasVarLenPages()) {
DCHECK(var_len_copy_page_.is_open());
sorted_var_len_pages.push_back(move(var_len_copy_page_));
cur_sorted_var_len_page = &sorted_var_len_pages.back();
} else if (has_var_len_slots_) {
// If we don't have any var-len pages, clean up the copy page.
DCHECK(var_len_copy_page_.is_open());
var_len_copy_page_.Close(sorter_->buffer_pool_client_);
} else {
DCHECK(!var_len_copy_page_.is_open());
}
Status status;
for (auto& fixed_len_page : fixed_len_pages_) {
Page* cur_fixed_page = &fixed_len_page;
// Skip converting the pointers if no var-len slots, or if all the values are null
// or zero-length. This will possibly leave zero-length pointers pointing to
// arbitrary memory, but zero-length data cannot be dereferenced anyway.
if (HasVarLenPages()) {
for (int page_offset = 0; page_offset < cur_fixed_page->valid_data_len();
page_offset += sort_tuple_size_) {
Tuple* cur_tuple = reinterpret_cast<Tuple*>(cur_fixed_page->data() + page_offset);
CollectNonNullVarSlots(cur_tuple, &string_values, &total_var_len);
DCHECK(cur_sorted_var_len_page->is_open());
if (cur_sorted_var_len_page->BytesRemaining() < total_var_len) {
bool added;
status = TryAddPage(UNPIN_PREV, &sorted_var_len_pages, &added);
if (!status.ok()) goto cleanup_pages;
DCHECK(added) << "TryAddPage() with UNPIN_PREV should not fail to add";
cur_sorted_var_len_page = &sorted_var_len_pages.back();
}
uint8_t* var_data_ptr = cur_sorted_var_len_page->AllocateBytes(total_var_len);
DCHECK_EQ(&sorted_var_len_pages.back(), cur_sorted_var_len_page);
CopyVarLenDataConvertOffset(string_values, sorted_var_len_pages.size() - 1,
cur_sorted_var_len_page->data(), var_data_ptr);
}
}
cur_fixed_page->Unpin(sorter_->buffer_pool_client_);
}
if (HasVarLenPages()) {
DCHECK_GT(sorted_var_len_pages.back().valid_data_len(), 0);
sorted_var_len_pages.back().Unpin(sorter_->buffer_pool_client_);
}
// Clear var_len_pages_ and replace with it with the contents of sorted_var_len_pages
DeleteAndClearPages(&var_len_pages_);
sorted_var_len_pages.swap(var_len_pages_);
is_pinned_ = false;
sorter_->spilled_runs_counter_->Add(1);
return Status::OK();
cleanup_pages:
DeleteAndClearPages(&sorted_var_len_pages);
return status;
}
Status Sorter::Run::PrepareRead() {
DCHECK(is_finalized_);
DCHECK(is_sorted_);
fixed_len_pages_index_ = 0;
fixed_len_page_offset_ = 0;
var_len_pages_index_ = 0;
end_of_fixed_len_page_ = end_of_var_len_page_ = fixed_len_pages_.empty();
num_tuples_returned_ = 0;
buffered_batch_.reset(new RowBatch(
sorter_->output_row_desc_, sorter_->state_->batch_size(), sorter_->mem_tracker_));
// If the run is pinned, all pages are already pinned, so we're ready to read.
if (is_pinned_) return Status::OK();
// Pins the first fixed len page.
if (!fixed_len_pages_.empty()) {
RETURN_IF_ERROR(fixed_len_pages_[0].Pin(sorter_->buffer_pool_client_));
}
// Pins the first var len page if there is any.
if (HasVarLenPages()) {
RETURN_IF_ERROR(var_len_pages_[0].Pin(sorter_->buffer_pool_client_));
}
return Status::OK();
}
Status Sorter::Run::GetNextBatch(RowBatch** output_batch) {
DCHECK(buffered_batch_ != nullptr);
buffered_batch_->Reset();
// Fill more rows into buffered_batch_.
bool eos;
if (HasVarLenPages() && !is_pinned_) {
RETURN_IF_ERROR(GetNext<true>(buffered_batch_.get(), &eos));
} else {
RETURN_IF_ERROR(GetNext<false>(buffered_batch_.get(), &eos));
}
if (eos) {
// Setting output_batch to nullptr signals eos to the caller, so GetNext() is not
// allowed to attach resources to the batch on eos.
DCHECK_EQ(buffered_batch_->num_rows(), 0);
DCHECK_EQ(buffered_batch_->num_buffers(), 0);
*output_batch = nullptr;
return Status::OK();
}
*output_batch = buffered_batch_.get();
return Status::OK();
}
template <bool CONVERT_OFFSET_TO_PTR>
Status Sorter::Run::GetNext(RowBatch* output_batch, bool* eos) {
// Var-len offsets are converted only when reading var-len data from unpinned runs.
// We shouldn't convert var len offsets if there are no pages, since in that case
// they must all be null or zero-length strings, which don't point into a valid page.
DCHECK_EQ(CONVERT_OFFSET_TO_PTR, HasVarLenPages() && !is_pinned_);
if (end_of_fixed_len_page_
&& fixed_len_pages_index_ >= static_cast<int>(fixed_len_pages_.size()) - 1) {
// All pages were previously attached to output batches. GetNextBatch() assumes
// that we don't attach resources to the batch on eos.
DCHECK_EQ(NumOpenPages(fixed_len_pages_), 0);
DCHECK_EQ(NumOpenPages(var_len_pages_), 0);
CloseAllPages();
*eos = true;
DCHECK_EQ(num_tuples_returned_, num_tuples_);
return Status::OK();
}
// Advance the fixed or var len page if we reached the end in the previous call to
// GetNext().
if (end_of_fixed_len_page_) {
RETURN_IF_ERROR(PinNextReadPage(&fixed_len_pages_, fixed_len_pages_index_));
++fixed_len_pages_index_;
fixed_len_page_offset_ = 0;
end_of_fixed_len_page_ = false;
}
if (end_of_var_len_page_) {
RETURN_IF_ERROR(PinNextReadPage(&var_len_pages_, var_len_pages_index_));
++var_len_pages_index_;
end_of_var_len_page_ = false;
}
// Fills rows into the output batch until a page boundary is reached.
Page* fixed_len_page = &fixed_len_pages_[fixed_len_pages_index_];
DCHECK(fixed_len_page != nullptr);
// Ensure we have a reference to the fixed-length page's buffer.
RETURN_IF_ERROR(fixed_len_page->WaitForBuffer());
// If we're converting offsets into unpinned var-len pages, make sure the
// current var-len page is in memory.
if (CONVERT_OFFSET_TO_PTR && HasVarLenPages()) {
RETURN_IF_ERROR(var_len_pages_[var_len_pages_index_].WaitForBuffer());
}
while (!output_batch->AtCapacity()
&& fixed_len_page_offset_ < fixed_len_page->valid_data_len()) {
DCHECK(fixed_len_page != nullptr);
Tuple* input_tuple =
reinterpret_cast<Tuple*>(fixed_len_page->data() + fixed_len_page_offset_);
if (CONVERT_OFFSET_TO_PTR && !ConvertOffsetsToPtrs(input_tuple)) {
DCHECK(!is_pinned_);
// The var-len data is in the next page. We are done with the current page, so
// return rows we've accumulated so far along with the page's buffer and advance
// to the next page in the next GetNext() call. We need the page's reservation to
// pin the next page, so flush resources.
DCHECK_GE(var_len_pages_index_, 0);
DCHECK_LT(var_len_pages_index_, var_len_pages_.size());
BufferPool::BufferHandle buffer =
var_len_pages_[var_len_pages_index_].ExtractBuffer(
sorter_->buffer_pool_client_);
output_batch->AddBuffer(sorter_->buffer_pool_client_, move(buffer),
RowBatch::FlushMode::FLUSH_RESOURCES);
end_of_var_len_page_ = true;
break;
}
output_batch->GetRow(output_batch->AddRow())->SetTuple(0, input_tuple);
output_batch->CommitLastRow();
fixed_len_page_offset_ += sort_tuple_size_;
++num_tuples_returned_;
}
if (fixed_len_page_offset_ >= fixed_len_page->valid_data_len()) {
// Reached the page boundary, need to move to the next page.
BufferPool::ClientHandle* client = sorter_->buffer_pool_client_;
// Attach page to batch. For unpinned pages we need to flush resource so we can pin
// the next page on the next call to GetNext().
RowBatch::FlushMode flush = is_pinned_ ? RowBatch::FlushMode::NO_FLUSH_RESOURCES :
RowBatch::FlushMode::FLUSH_RESOURCES;
output_batch->AddBuffer(
client, fixed_len_pages_[fixed_len_pages_index_].ExtractBuffer(client), flush);
// Attach remaining var-len pages at eos once no more rows will reference the pages.
if (fixed_len_pages_index_ == fixed_len_pages_.size() - 1) {
for (Page& var_len_page : var_len_pages_) {
DCHECK(!is_pinned_ || var_len_page.is_open());
if (var_len_page.is_open()) {
output_batch->AddBuffer(client, var_len_page.ExtractBuffer(client),
RowBatch::FlushMode::NO_FLUSH_RESOURCES);
}
}
var_len_pages_.clear();
}
end_of_fixed_len_page_ = true;
}
*eos = false;
return Status::OK();
}
Status Sorter::Run::PinNextReadPage(vector<Page>* pages, int page_index) {
DCHECK_GE(page_index, 0);
DCHECK_LT(page_index, pages->size() - 1);
Page* curr_page = &(*pages)[page_index];
Page* next_page = &(*pages)[page_index + 1];
DCHECK_EQ(is_pinned_, next_page->is_pinned());
// The current page was attached to a batch.
DCHECK(!curr_page->is_open());
// 'next_page' is already pinned if the whole stream is pinned.
if (is_pinned_) return Status::OK();
RETURN_IF_ERROR(next_page->Pin(sorter_->buffer_pool_client_));
return Status::OK();
}
void Sorter::Run::CollectNonNullVarSlots(Tuple* src,
vector<StringValue*>* string_values, int* total_var_len) {
string_values->clear();
*total_var_len = 0;
for (const SlotDescriptor* string_slot: sort_tuple_desc_->string_slots()) {
if (!src->IsNull(string_slot->null_indicator_offset())) {
StringValue* string_val =
reinterpret_cast<StringValue*>(src->GetSlot(string_slot->tuple_offset()));
string_values->push_back(string_val);
*total_var_len += string_val->len;
}
}
}
Status Sorter::Run::TryAddPage(
AddPageMode mode, vector<Page>* page_sequence, bool* added) {
DCHECK(!page_sequence->empty());
if (mode == KEEP_PREV_PINNED) {
if (!sorter_->buffer_pool_client_->IncreaseReservationToFit(sorter_->page_len_)) {
*added = false;
return Status::OK();
}
} else {
DCHECK(mode == UNPIN_PREV);
// Unpin the prev page to free up the memory required to pin the next page.
page_sequence->back().Unpin(sorter_->buffer_pool_client_);
}
RETURN_IF_ERROR(AddPage(page_sequence));
*added = true;
return Status::OK();
}
Status Sorter::Run::AddPage(vector<Page>* page_sequence) {
Page new_page;
RETURN_IF_ERROR(new_page.Init(sorter_));
page_sequence->push_back(move(new_page));
return Status::OK();
}
void Sorter::Run::CopyVarLenData(const vector<StringValue*>& string_values,
uint8_t* dest) {
for (StringValue* string_val: string_values) {
Ubsan::MemCpy(dest, string_val->ptr, string_val->len);
string_val->ptr = reinterpret_cast<char*>(dest);
dest += string_val->len;
}
}
void Sorter::Run::CopyVarLenDataConvertOffset(const vector<StringValue*>& string_values,
int page_index, const uint8_t* page_start, uint8_t* dest) {
DCHECK_GE(page_index, 0);
DCHECK_GE(dest - page_start, 0);
for (StringValue* string_val : string_values) {
memcpy(dest, string_val->ptr, string_val->len);
DCHECK_LE(dest - page_start, sorter_->page_len_);
DCHECK_LE(dest - page_start, numeric_limits<uint32_t>::max());
uint32_t page_offset = dest - page_start;
uint64_t packed_offset = (static_cast<uint64_t>(page_index) << 32) | page_offset;
string_val->ptr = reinterpret_cast<char*>(packed_offset);
dest += string_val->len;
}
}
bool Sorter::Run::ConvertOffsetsToPtrs(Tuple* tuple) {
// We need to be careful to handle the case where var_len_pages_ is empty,
// e.g. if all strings are nullptr.
uint8_t* page_start =
var_len_pages_.empty() ? nullptr : var_len_pages_[var_len_pages_index_].data();
const vector<SlotDescriptor*>& string_slots = sort_tuple_desc_->string_slots();
int num_non_null_string_slots = 0;
for (auto slot_desc : string_slots) {
if (tuple->IsNull(slot_desc->null_indicator_offset())) continue;
++num_non_null_string_slots;
DCHECK(slot_desc->type().IsVarLenStringType());
StringValue* value = reinterpret_cast<StringValue*>(
tuple->GetSlot(slot_desc->tuple_offset()));
// packed_offset includes the page index in the upper 32 bits and the page
// offset in the lower 32 bits. See CopyVarLenDataConvertOffset().
uint64_t packed_offset = reinterpret_cast<uint64_t>(value->ptr);
uint32_t page_index = packed_offset >> 32;
uint32_t page_offset = packed_offset & 0xFFFFFFFF;
if (page_index > var_len_pages_index_) {
// We've reached the page boundary for the current var-len page.
// This tuple will be returned in the next call to GetNext().
DCHECK_GE(page_index, 0);
DCHECK_LE(page_index, var_len_pages_.size());
DCHECK_EQ(page_index, var_len_pages_index_ + 1);
DCHECK_EQ(page_offset, 0); // The data is the first thing in the next page.
// This must be the first slot with var len data for the tuple. Var len data
// for tuple shouldn't be split across blocks.
DCHECK_EQ(num_non_null_string_slots, 1);
return false;
}
DCHECK_EQ(page_index, var_len_pages_index_);
if (var_len_pages_.empty()) {
DCHECK_EQ(value->len, 0);
} else {
DCHECK_LE(page_offset + value->len, var_len_pages_[page_index].valid_data_len());
}
// Calculate the address implied by the offset and assign it. May be nullptr for
// zero-length strings if there are no pages in the run since page_start is nullptr.
DCHECK(page_start != nullptr || page_offset == 0);
value->ptr = reinterpret_cast<char*>(page_start + page_offset);
}
return true;
}
int64_t Sorter::Run::TotalBytes() const {
int64_t total_bytes = 0;
for (const Page& page : fixed_len_pages_) {
if (page.is_open()) total_bytes += page.valid_data_len();
}
for (const Page& page : var_len_pages_) {
if (page.is_open()) total_bytes += page.valid_data_len();
}
return total_bytes;
}
Status Sorter::Run::AddInputBatch(RowBatch* batch, int start_index, int* num_processed) {
DCHECK(initial_run_);
if (has_var_len_slots_) {
return AddBatchInternal<true, true>(batch, start_index, num_processed);
} else {
return AddBatchInternal<false, true>(batch, start_index, num_processed);
}
}
Status Sorter::Run::AddIntermediateBatch(
RowBatch* batch, int start_index, int* num_processed) {
DCHECK(!initial_run_);
if (has_var_len_slots_) {
return AddBatchInternal<true, false>(batch, start_index, num_processed);
} else {
return AddBatchInternal<false, false>(batch, start_index, num_processed);
}
}
void Sorter::Run::CleanupRuns(deque<Sorter::Run*>* runs) {
for (Run* run : *runs) {
run->CloseAllPages();
}
runs->clear();
}
int Sorter::Run::NumOpenPages(const vector<Sorter::Page>& pages) {
int count = 0;
for (const Page& page : pages) {
if (page.is_open()) ++count;
}
return count;
}
void Sorter::Run::DeleteAndClearPages(vector<Sorter::Page>* pages) {
for (Page& page : *pages) {
if (page.is_open()) page.Close(sorter_->buffer_pool_client_);
}
pages->clear();
}
Sorter::Run::~Run() {
DCHECK(fixed_len_pages_.empty());
DCHECK(var_len_pages_.empty());
DCHECK(!var_len_copy_page_.is_open());
}
Sorter::TupleIterator::TupleIterator(Sorter::Run* run, int64_t index)
: index_(index), tuple_(nullptr) {
DCHECK(run->is_finalized_);
DCHECK_GE(index, 0);
DCHECK_LE(index, run->num_tuples());
// If the run is empty, only index_ and tuple_ are initialized.
if (run->num_tuples() == 0) {
DCHECK_EQ(index, 0);
return;
}
const int tuple_size = run->sort_tuple_size_;
uint32_t page_offset;
if (UNLIKELY(index == run->num_tuples())) {
// If the iterator is initialized past the end, set up buffer_start_index_,
// 'buffer_end_index_' and 'page_index_' for the last page, then set 'tuple' to
// 'tuple_size' bytes past the last tuple, so everything is correct when Prev() is
// invoked.
page_index_ = run->fixed_len_pages_.size() - 1;
page_offset = ((index - 1) % run->page_capacity_) * tuple_size + tuple_size;
} else {
page_index_ = index / run->page_capacity_;
page_offset = (index % run->page_capacity_) * tuple_size;
}
buffer_start_index_ = page_index_ * run->page_capacity_;
buffer_end_index_ = buffer_start_index_ + run->page_capacity_;
tuple_ = run->fixed_len_pages_[page_index_].data() + page_offset;
}
Sorter::TupleSorter::TupleSorter(Sorter* parent, const TupleRowComparator& comp,
int tuple_size, RuntimeState* state)
: parent_(parent),
tuple_size_(tuple_size),
comparator_(comp),
num_comparisons_till_free_(state->batch_size()),
state_(state) {
temp_tuple_buffer_ = new uint8_t[tuple_size];
swap_buffer_ = new uint8_t[tuple_size];
}
Sorter::TupleSorter::~TupleSorter() {
delete[] temp_tuple_buffer_;
delete[] swap_buffer_;
}
Status Sorter::TupleSorter::Sort(Run* run) {
DCHECK(run->is_finalized());
DCHECK(!run->is_sorted());
run_ = run;
RETURN_IF_ERROR(SortHelper(TupleIterator::Begin(run_), TupleIterator::End(run_)));
run_->set_sorted();
return Status::OK();
}
Sorter::Sorter(const std::vector<ScalarExpr*>& ordering_exprs,
const std::vector<bool>& is_asc_order, const std::vector<bool>& nulls_first,
const vector<ScalarExpr*>& sort_tuple_exprs, RowDescriptor* output_row_desc,
MemTracker* mem_tracker, BufferPool::ClientHandle* buffer_pool_client,
int64_t page_len, RuntimeProfile* profile, RuntimeState* state,
const string& node_label, bool enable_spilling)
: node_label_(node_label),
state_(state),
expr_perm_pool_(mem_tracker),
expr_results_pool_(mem_tracker),
compare_less_than_(ordering_exprs, is_asc_order, nulls_first),
in_mem_tuple_sorter_(nullptr),
buffer_pool_client_(buffer_pool_client),
page_len_(page_len),
has_var_len_slots_(false),
sort_tuple_exprs_(sort_tuple_exprs),
mem_tracker_(mem_tracker),
output_row_desc_(output_row_desc),
enable_spilling_(enable_spilling),
unsorted_run_(nullptr),
merge_output_run_(nullptr),
profile_(profile),
initial_runs_counter_(nullptr),
num_merges_counter_(nullptr),
in_mem_sort_timer_(nullptr),
sorted_data_size_(nullptr),
run_sizes_(nullptr) {}
Sorter::~Sorter() {
DCHECK(sorted_runs_.empty());
DCHECK(merging_runs_.empty());
DCHECK(unsorted_run_ == nullptr);
DCHECK(merge_output_run_ == nullptr);
}
Status Sorter::Prepare(ObjectPool* obj_pool) {
DCHECK(in_mem_tuple_sorter_ == nullptr) << "Already prepared";
// Page byte offsets are packed into uint32_t values, which limits the supported
// page size.
if (page_len_ > numeric_limits<uint32_t>::max()) {
return Status(Substitute(
"Page size $0 exceeded maximum supported in sorter ($1)",
PrettyPrinter::PrintBytes(page_len_),
PrettyPrinter::PrintBytes(numeric_limits<uint32_t>::max())));
}
TupleDescriptor* sort_tuple_desc = output_row_desc_->tuple_descriptors()[0];
if (sort_tuple_desc->byte_size() > page_len_) {
return Status(TErrorCode::MAX_ROW_SIZE,
PrettyPrinter::Print(sort_tuple_desc->byte_size(), TUnit::BYTES), node_label_,
PrettyPrinter::Print(state_->query_options().max_row_size, TUnit::BYTES));
}
has_var_len_slots_ = sort_tuple_desc->HasVarlenSlots();
in_mem_tuple_sorter_.reset(
new TupleSorter(this, compare_less_than_, sort_tuple_desc->byte_size(), state_));
if (enable_spilling_) {
initial_runs_counter_ = ADD_COUNTER(profile_, "InitialRunsCreated", TUnit::UNIT);
spilled_runs_counter_ = ADD_COUNTER(profile_, "SpilledRuns", TUnit::UNIT);
num_merges_counter_ = ADD_COUNTER(profile_, "TotalMergesPerformed", TUnit::UNIT);
} else {
initial_runs_counter_ = ADD_COUNTER(profile_, "RunsCreated", TUnit::UNIT);
}
in_mem_sort_timer_ = ADD_TIMER(profile_, "InMemorySortTime");
sorted_data_size_ = ADD_COUNTER(profile_, "SortDataSize", TUnit::BYTES);
run_sizes_ = ADD_SUMMARY_STATS_COUNTER(profile_, "NumRowsPerRun", TUnit::UNIT);
RETURN_IF_ERROR(ScalarExprEvaluator::Create(sort_tuple_exprs_, state_, obj_pool,
&expr_perm_pool_, &expr_results_pool_, &sort_tuple_expr_evals_));
return Status::OK();
}
Status Sorter::Codegen(RuntimeState* state) {
return compare_less_than_.Codegen(state);
}
Status Sorter::Open() {
DCHECK(in_mem_tuple_sorter_ != nullptr) << "Not prepared";
DCHECK(unsorted_run_ == nullptr) << "Already open";
RETURN_IF_ERROR(compare_less_than_.Open(&obj_pool_, state_, &expr_perm_pool_,
&expr_results_pool_));
TupleDescriptor* sort_tuple_desc = output_row_desc_->tuple_descriptors()[0];
unsorted_run_ = run_pool_.Add(new Run(this, sort_tuple_desc, true));
RETURN_IF_ERROR(unsorted_run_->Init());
RETURN_IF_ERROR(ScalarExprEvaluator::Open(sort_tuple_expr_evals_, state_));
return Status::OK();
}
// Must be kept in sync with SortNode.computeNodeResourceProfile() in fe.
int64_t Sorter::ComputeMinReservation() const {
int min_buffers_required = enable_spilling_ ? MIN_BUFFERS_PER_MERGE : 1;
// Fixed and var-length pages are separate, so we need double the pages
// if there is var-length data.
if (output_row_desc_->HasVarlenSlots()) min_buffers_required *= 2;
return min_buffers_required * page_len_;
}
Status Sorter::AddBatch(RowBatch* batch) {
DCHECK(unsorted_run_ != nullptr);
DCHECK(batch != nullptr);
DCHECK(enable_spilling_);
int num_processed = 0;
int cur_batch_index = 0;
while (cur_batch_index < batch->num_rows()) {
RETURN_IF_ERROR(AddBatchNoSpill(batch, cur_batch_index, &num_processed));
cur_batch_index += num_processed;
if (cur_batch_index < batch->num_rows()) {
// The current run is full. Sort it, spill it and begin the next one.
RETURN_IF_ERROR(state_->StartSpilling(mem_tracker_));
RETURN_IF_ERROR(SortCurrentInputRun());
RETURN_IF_ERROR(sorted_runs_.back()->UnpinAllPages());
unsorted_run_ =
run_pool_.Add(new Run(this, output_row_desc_->tuple_descriptors()[0], true));
RETURN_IF_ERROR(unsorted_run_->Init());
}
}
// Clear any temporary allocations made while materializing the sort tuples.
expr_results_pool_.Clear();
return Status::OK();
}
Status Sorter::AddBatchNoSpill(RowBatch* batch, int start_index, int* num_processed) {
DCHECK(batch != nullptr);
RETURN_IF_ERROR(unsorted_run_->AddInputBatch(batch, start_index, num_processed));
// Clear any temporary allocations made while materializing the sort tuples.
expr_results_pool_.Clear();
return Status::OK();
}
Status Sorter::InputDone() {
// Sort the tuples in the last run.
RETURN_IF_ERROR(SortCurrentInputRun());
if (sorted_runs_.size() == 1) {
// The entire input fit in one run. Read sorted rows in GetNext() directly from the
// in-memory sorted run.
DCHECK(sorted_runs_.back()->is_pinned());
RETURN_IF_ERROR(sorted_runs_.back()->PrepareRead());
return Status::OK();
}
DCHECK(enable_spilling_);
// Unpin the final run to free up memory for the merge.
// TODO: we could keep it in memory in some circumstances as an optimisation, once
// we have a buffer pool with more reliable reservations (IMPALA-3200).
RETURN_IF_ERROR(sorted_runs_.back()->UnpinAllPages());
// Merge intermediate runs until we have a final merge set-up.
return MergeIntermediateRuns();
}
Status Sorter::GetNext(RowBatch* output_batch, bool* eos) {
if (sorted_runs_.size() == 1) {
DCHECK(sorted_runs_.back()->is_pinned());
return sorted_runs_.back()->GetNext<false>(output_batch, eos);
} else {
RETURN_IF_ERROR(merger_->GetNext(output_batch, eos));
// Clear any temporary allocations made by the merger.
expr_results_pool_.Clear();
return Status::OK();
}
}
void Sorter::Reset() {
DCHECK(unsorted_run_ == nullptr) << "Cannot Reset() before calling InputDone()";
merger_.reset();
// Free resources from the current runs.
CleanupAllRuns();
compare_less_than_.Close(state_);
}
void Sorter::Close(RuntimeState* state) {
CleanupAllRuns();
compare_less_than_.Close(state);
ScalarExprEvaluator::Close(sort_tuple_expr_evals_, state);
expr_perm_pool_.FreeAll();
expr_results_pool_.FreeAll();
obj_pool_.Clear();
}
void Sorter::CleanupAllRuns() {
Run::CleanupRuns(&sorted_runs_);
Run::CleanupRuns(&merging_runs_);
if (unsorted_run_ != nullptr) unsorted_run_->CloseAllPages();
unsorted_run_ = nullptr;
if (merge_output_run_ != nullptr) merge_output_run_->CloseAllPages();
merge_output_run_ = nullptr;
run_pool_.Clear();
}
Status Sorter::SortCurrentInputRun() {
RETURN_IF_ERROR(unsorted_run_->FinalizeInput());
{
SCOPED_TIMER(in_mem_sort_timer_);
RETURN_IF_ERROR(in_mem_tuple_sorter_->Sort(unsorted_run_));
}
sorted_runs_.push_back(unsorted_run_);
sorted_data_size_->Add(unsorted_run_->TotalBytes());
run_sizes_->UpdateCounter(unsorted_run_->num_tuples());
unsorted_run_ = nullptr;
RETURN_IF_CANCELLED(state_);
return Status::OK();
}
int Sorter::MaxRunsInNextMerge() const {
int num_available_buffers = buffer_pool_client_->GetUnusedReservation() / page_len_;
DCHECK_GE(num_available_buffers, ComputeMinReservation() / page_len_);
int num_runs_in_one_merge = 0;
int num_required_buffers = 0;
for (int i = 0; i < sorted_runs_.size(); ++i) {
int num_buffers_for_this_run = (sorted_runs_[i]->HasVarLenPages()) ? 2 : 1;
if (num_required_buffers + num_buffers_for_this_run <= num_available_buffers) {
num_required_buffers += num_buffers_for_this_run;
++num_runs_in_one_merge;
} else {
// Not enough buffers to merge all the runs in one final merge. Intermediate merge
// is required.
// Increasing the required buffers count to include the result run of the
// intermediate merge.
num_required_buffers +=
(output_row_desc_->tuple_descriptors()[0]->HasVarlenSlots()) ? 2 : 1;
// Have to reduce the number of runs for this merge to fit in the available buffer
// pool memory.
for (int j = i - 1; j >= 0; --j) {
num_required_buffers -= sorted_runs_[j]->HasVarLenPages() ? 2 : 1;
--num_runs_in_one_merge;
if (num_required_buffers <= num_available_buffers) break;
}
DCHECK_LE(num_required_buffers, num_available_buffers);
break;
}
}
DCHECK_GT(num_runs_in_one_merge, 1);
return num_runs_in_one_merge;
}
void Sorter::TryToIncreaseMemAllocationForMerge() {
int pages_needed_for_full_merge = 0;
for (auto run : sorted_runs_) {
pages_needed_for_full_merge += (run->HasVarLenPages()) ? 2 : 1;
}
int available_pages = buffer_pool_client_->GetUnusedReservation() / page_len_;
// Start allocating more pages than available now. Stop once no more memory can be
// allocated.
for (int i = 0; i < pages_needed_for_full_merge - available_pages; ++i) {
if (!buffer_pool_client_->IncreaseReservation(page_len_)) return;
}
}
int Sorter::GetNumOfRunsForMerge() const {
int max_runs_in_next_merge = MaxRunsInNextMerge();
// Check if all the runs fit in a final merge. Won't need an extra run for the output
// compared to an intermediate run.
if (max_runs_in_next_merge == sorted_runs_.size()) {
return max_runs_in_next_merge;
}
// If this is the last intermediate merge before the final merge then distributes the
// runs between the intermediate and the final merge to saturate the final merge with
// as many runs as possible reducing the number of merging on the same rows.
if (max_runs_in_next_merge * 2 >= sorted_runs_.size()) {
return sorted_runs_.size() - max_runs_in_next_merge;
}
return max_runs_in_next_merge;
}
Status Sorter::MergeIntermediateRuns() {
DCHECK_GE(sorted_runs_.size(), 2);
// Attempt to allocate more memory before doing intermediate merges. This may
// be possible if other operators have relinquished memory after the sort has built
// its runs.
TryToIncreaseMemAllocationForMerge();
while (true) {
int num_of_runs_to_merge = GetNumOfRunsForMerge();
DCHECK(merge_output_run_ == nullptr) << "Should have finished previous merge.";
RETURN_IF_ERROR(CreateMerger(num_of_runs_to_merge));
// If CreateMerger() consumed all the sorted runs, we have set up the final merge.
if (sorted_runs_.empty()) return Status::OK();
merge_output_run_ = run_pool_.Add(
new Run(this, output_row_desc_->tuple_descriptors()[0], false));
RETURN_IF_ERROR(merge_output_run_->Init());
RETURN_IF_ERROR(ExecuteIntermediateMerge(merge_output_run_));
sorted_runs_.push_back(merge_output_run_);
merge_output_run_ = nullptr;
}
return Status::OK();
}
Status Sorter::CreateMerger(int num_runs) {
DCHECK_GE(num_runs, 2);
DCHECK_GE(sorted_runs_.size(), 2);
// Clean up the runs from the previous merge.
Run::CleanupRuns(&merging_runs_);
// TODO: 'deep_copy_input' is set to true, which forces the merger to copy all rows
// from the runs being merged. This is unnecessary overhead that is not required if we
// correctly transfer resources.
merger_.reset(
new SortedRunMerger(compare_less_than_, output_row_desc_, profile_, true));
vector<function<Status (RowBatch**)>> merge_runs;
merge_runs.reserve(num_runs);
for (int i = 0; i < num_runs; ++i) {
Run* run = sorted_runs_.front();
RETURN_IF_ERROR(run->PrepareRead());
// Run::GetNextBatch() is used by the merger to retrieve a batch of rows to merge
// from this run.
merge_runs.emplace_back(bind<Status>(mem_fn(&Run::GetNextBatch), run, _1));
sorted_runs_.pop_front();
merging_runs_.push_back(run);
}
RETURN_IF_ERROR(merger_->Prepare(merge_runs));
num_merges_counter_->Add(1);
return Status::OK();
}
Status Sorter::ExecuteIntermediateMerge(Sorter::Run* merged_run) {
RowBatch intermediate_merge_batch(
output_row_desc_, state_->batch_size(), mem_tracker_);
bool eos = false;
while (!eos) {
// Copy rows into the new run until done.
int num_copied;
RETURN_IF_CANCELLED(state_);
// Clear any temporary allocations made by the merger.
expr_results_pool_.Clear();
RETURN_IF_ERROR(merger_->GetNext(&intermediate_merge_batch, &eos));
RETURN_IF_ERROR(
merged_run->AddIntermediateBatch(&intermediate_merge_batch, 0, &num_copied));
DCHECK_EQ(num_copied, intermediate_merge_batch.num_rows());
intermediate_merge_batch.Reset();
}
RETURN_IF_ERROR(merged_run->FinalizeInput());
return Status::OK();
}
bool Sorter::HasSpilledRuns() const {
// All runs in 'merging_runs_' are spilled. 'sorted_runs_' can contain at most one
// non-spilled run.
return !merging_runs_.empty() || sorted_runs_.size() > 1 ||
(sorted_runs_.size() == 1 && !sorted_runs_.back()->is_pinned());
}
} // namespace impala