blob: ea8275a63921d27e887349bfc56e5794922fc153 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// Definitions of classes that are internal to the Sorter but shared between .cc files.
#pragma once
#include "sorter.h"
namespace impala {
/// Wrapper around BufferPool::PageHandle that tracks additional info about the page.
/// The Page can be in four states:
/// * Closed: The page starts in this state before Init() is called. Calling
/// ExtractBuffer() or Close() puts the page back in this state. No other operations
/// are valid on a closed page.
/// * In memory: the page is pinned and the buffer is in memory. data() is valid. The
/// page is in this state after Init(). If the page is pinned but not in memory, it
/// can be brought into this state by calling WaitForBuffer().
/// * Unpinned: the page was unpinned by calling Unpin(). It is invalid to access the
/// page's buffer.
/// * Pinned but not in memory: Pin() was called on the unpinned page, but
/// WaitForBuffer() has not been called. It is invalid to access the page's buffer.
class Sorter::Page {
public:
Page() { Reset(); }
/// Create a new page of length 'sorter->page_len_' bytes using
/// 'sorter->buffer_pool_client_'. Caller must ensure the client has enough
/// reservation for the page.
Status Init(Sorter* sorter);
/// Extract the buffer from the page. The page must be in memory. When this function
/// returns the page is closed.
BufferPool::BufferHandle ExtractBuffer(BufferPool::ClientHandle* client);
/// Allocate 'len' bytes in the current page. The page must be in memory, and the
/// amount to allocate cannot exceed BytesRemaining().
uint8_t* AllocateBytes(int64_t len);
/// Free the last 'len' bytes allocated from AllocateBytes(). The page must be in
/// memory.
void FreeBytes(int64_t len);
/// Return number of bytes remaining in page.
int64_t BytesRemaining() { return len() - valid_data_len_; }
/// Brings a pinned page into memory, if not already in memory, and sets 'data_' to
/// point to the page's buffer.
Status WaitForBuffer();
/// Helper to pin the page. Caller must ensure the client has enough reservation
/// remaining to pin the page. Only valid to call on an unpinned page.
Status Pin(BufferPool::ClientHandle* client);
/// Helper to unpin the page.
void Unpin(BufferPool::ClientHandle* client);
/// Destroy the page with 'client'.
void Close(BufferPool::ClientHandle* client);
int64_t valid_data_len() const { return valid_data_len_; }
/// Returns a pointer to the start of the page's buffer. Only valid to call if the
/// page is in memory.
uint8_t* data() const {
DCHECK(data_ != nullptr);
return data_;
}
int64_t len() const { return handle_.len(); }
bool is_open() const { return handle_.is_open(); }
bool is_pinned() const { return handle_.is_pinned(); }
std::string DebugString() const { return handle_.DebugString(); }
private:
/// Reset the page to an uninitialized state. 'handle_' must already be closed.
void Reset();
/// Helper to get the singleton buffer pool.
static BufferPool* pool();
BufferPool::PageHandle handle_;
/// Length of valid data written to the page.
int64_t valid_data_len_;
/// Cached pointer to the buffer in 'handle_'. NULL if the page is unpinned. May be
/// NULL or not NULL if the page is pinned. Can be populated by calling
/// WaitForBuffer() on a pinned page.
uint8_t* data_;
};
/// A run is a sequence of tuples. The run can be sorted or unsorted (in which case the
/// Sorter will sort it). A run comprises a sequence of fixed-length pages containing
/// the tuples themselves (i.e. fixed-len slots that may contain ptrs to var-length
/// data), and an optional sequence of var-length pages containing the var-length data.
///
/// Runs are either "initial runs" constructed from the sorter's input by evaluating
/// the expressions in 'sort_tuple_exprs_' or "intermediate runs" constructed
/// by merging already-sorted runs. Initial runs are sorted in-place in memory. Once
/// sorted, runs can be spilled to disk to free up memory. Sorted runs are merged by
/// SortedRunMerger, either to produce the final sorted output or to produce another
/// sorted run.
///
/// The expected calling sequence of functions is as follows:
/// * Init() to initialize the run and allocate initial pages.
/// * Add*Batch() to add batches of tuples to the run.
/// * FinalizeInput() to signal that no more batches will be added.
/// * If the run is unsorted, it must be sorted. After that set_sorted() must be called.
/// * Once sorted, the run is ready to read in sorted order for merging or final output.
/// * PrepareRead() to allocate resources for reading the run.
/// * GetNext() (if there was a single run) or GetNextBatch() (when merging multiple
/// * runs) to read from the run.
/// * Once reading is done, CloseAllPages() should be called to free resources.
class Sorter::Run {
public:
Run(Sorter* parent, TupleDescriptor* sort_tuple_desc, bool initial_run);
~Run();
/// Initialize the run for input rows by allocating the minimum number of required
/// pages - one page for fixed-len data added to fixed_len_pages_, one for the
/// initially unsorted var-len data added to var_len_pages_, and one to copy sorted
/// var-len data into var_len_copy_page_.
Status Init();
/// Add the rows from 'batch' starting at 'start_index' to the current run. Returns
/// the number of rows actually added in 'num_processed'. If the run is full (no more
/// pages can be allocated), 'num_processed' may be less than the number of remaining
/// rows in the batch. AddInputBatch() materializes the input rows using the
/// expressions in sorter_->sort_tuple_expr_evals_, while AddIntermediateBatch() just
/// copies rows.
Status AddInputBatch(RowBatch* batch, int start_index, int* num_processed);
Status AddIntermediateBatch(RowBatch* batch, int start_index, int* num_processed);
/// Called after the final call to Add*Batch() to do any bookkeeping necessary to
/// finalize the run. Must be called before sorting or merging the run.
Status FinalizeInput();
/// Unpins all the pages in a sorted run. Var-length column data is copied into new
/// pages in sorted order. Pointers in the original tuples are converted to offsets
/// from the beginning of the sequence of var-len data pages. Returns an error and
/// may leave some pages pinned if an error is encountered.
Status UnpinAllPages();
/// Closes all pages and clears vectors of pages.
void CloseAllPages();
/// Prepare to read a sorted run. Pins the first page(s) in the run if the run was
/// previously unpinned. If the run was unpinned, try to pin the initial fixed and
/// var len pages in the run. If it couldn't pin them, an error Status is returned.
Status PrepareRead();
/// Interface for merger - get the next batch of rows from this run. This run still
/// owns the returned batch. Calls GetNext(RowBatch*, bool*).
Status GetNextBatch(RowBatch** sorted_batch);
/// Fill output_batch with rows from this run. If CONVERT_OFFSET_TO_PTR is true,
/// offsets in var-length slots are converted back to pointers. Only row pointers are
/// copied into output_batch. eos is set to true after all rows from the run are
/// returned. If eos is true, the returned output_batch has zero rows and has no
/// attached pages. If this run was unpinned, one page (two if there are var-len
/// slots) is pinned while rows are filled into output_batch. The page is unpinned
/// before the next page is pinned, so at most one (two if there are var-len slots)
/// page(s) will be pinned at once. If the run was pinned, the pages are not unpinned
/// and each page is attached to 'output_batch' once all rows referencing data in the
/// page have been returned, either in the current batch or previous batches. In both
/// pinned and unpinned cases, all rows in output_batch will reference at most one
/// fixed-len and one var-len page.
template <bool CONVERT_OFFSET_TO_PTR>
Status GetNext(RowBatch* output_batch, bool* eos);
/// Delete all pages in 'runs' and clear 'runs'.
static void CleanupRuns(std::deque<Run*>* runs);
/// Return total amount of fixed and var len data in run, not including pages that
/// were already transferred or closed.
int64_t TotalBytes() const;
bool is_pinned() const { return is_pinned_; }
bool is_finalized() const { return is_finalized_; }
bool is_sorted() const { return is_sorted_; }
void set_sorted() { is_sorted_ = true; }
int64_t num_tuples() const { return num_tuples_; }
/// Returns true if we have var-len pages in the run.
bool HasVarLenPages() const {
// Shouldn't have any pages unless there are slots.
DCHECK(var_len_pages_.empty() || has_var_len_slots_);
return !var_len_pages_.empty();
}
private:
/// TupleIterator needs access to internals to iterate over tuples.
friend class TupleIterator;
/// Templatized implementation of Add*Batch() functions.
/// INITIAL_RUN and HAS_VAR_LEN_SLOTS are template arguments for performance and must
/// match 'initial_run_' and 'has_var_len_slots_'.
template <bool HAS_VAR_LEN_SLOTS, bool INITIAL_RUN>
Status AddBatchInternal(
RowBatch* batch, int start_index, int* num_processed);
/// Finalize the list of pages: delete empty final pages and unpin the previous page
/// if the run is unpinned.
Status FinalizePages(vector<Page>* pages);
/// Collect the non-null var-len (e.g. STRING) slots from 'src' in 'var_len_values' and
/// return the total length of all var-len values in 'total_var_len'.
void CollectNonNullVarSlots(
Tuple* src, vector<StringValue*>* var_len_values, int* total_var_len);
enum AddPageMode { KEEP_PREV_PINNED, UNPIN_PREV };
/// Try to extend the current run by a page. If 'mode' is KEEP_PREV_PINNED, try to
/// allocate a new page, which may fail to extend the run due to lack of memory. If
/// mode is 'UNPIN_PREV', unpin the previous page in page_sequence before allocating
/// and adding a new page - this never fails due to lack of memory.
///
/// Returns an error status only if the buffer pool returns an error. If no error is
/// encountered, sets 'added' to indicate whether the run was extended and returns
/// Status::OK(). The new page is appended to 'page_sequence'.
Status TryAddPage(AddPageMode mode, vector<Page>* page_sequence, bool* added);
/// Adds a new page to 'page_sequence' by a page. Caller must ensure enough
/// reservation is available to create the page.
///
/// Returns an error status only if the buffer pool returns an error. If an error
/// is returned 'page_sequence' is left unmodified.
Status AddPage(vector<Page>* page_sequence);
/// Advance to the next read page. If the run is pinned, has no effect. If the run
/// is unpinned, the pin at 'page_index' was already attached to an output batch and
/// this function will pin the page at 'page_index' + 1 in 'pages'.
Status PinNextReadPage(vector<Page>* pages, int page_index);
/// Copy the StringValues in 'var_values' to 'dest' in order and update the StringValue
/// ptrs in 'dest' to point to the copied data.
void CopyVarLenData(const vector<StringValue*>& var_values, uint8_t* dest);
/// Copy the StringValues in 'var_values' to 'dest' in order. Update the StringValue
/// ptrs in 'dest' to contain a packed offset for the copied data comprising
/// page_index and the offset relative to page_start.
void CopyVarLenDataConvertOffset(const vector<StringValue*>& var_values, int page_index,
const uint8_t* page_start, uint8_t* dest);
/// Convert encoded offsets to valid pointers in tuple with layout 'sort_tuple_desc_'.
/// 'tuple' is modified in-place. Returns true if the pointers refer to the page at
/// 'var_len_pages_index_' and were successfully converted or false if the var len
/// data is in the next page, in which case 'tuple' is unmodified.
bool ConvertOffsetsToPtrs(Tuple* tuple);
int NumOpenPages(const vector<Page>& pages);
/// Close all open pages and clear vector.
void DeleteAndClearPages(vector<Page>* pages);
/// Parent sorter object.
Sorter* const sorter_;
/// Materialized sort tuple. Input rows are materialized into 1 tuple (with descriptor
/// sort_tuple_desc_) before sorting.
const TupleDescriptor* sort_tuple_desc_;
/// The size in bytes of the sort tuple.
const int sort_tuple_size_;
/// Number of tuples per page in a run. This gets multiplied with
/// TupleIterator::page_index_ in various places and to make sure we don't overflow
/// the result of that operation we make this int64_t here.
const int64_t page_capacity_;
const bool has_var_len_slots_;
/// True if this is an initial run. False implies this is an sorted intermediate run
/// resulting from merging other runs.
const bool initial_run_;
/// True if all pages in the run are pinned. Initial runs start off pinned and
/// can be unpinned. Intermediate runs are always unpinned.
bool is_pinned_;
/// True after FinalizeInput() is called. No more tuples can be added after the
/// run is finalized.
bool is_finalized_;
/// True if the tuples in the run are currently in sorted order.
/// Always true for intermediate runs.
bool is_sorted_;
/// Sequence of pages in this run containing the fixed-length portion of the sort
/// tuples comprising this run. The data pointed to by the var-len slots are in
/// var_len_pages_. A run can have zero pages if no rows are appended.
/// If the run is sorted, the tuples in fixed_len_pages_ will be in sorted order.
/// fixed_len_pages_[i] is closed iff it has been transferred or deleted.
vector<Page> fixed_len_pages_;
/// Sequence of pages in this run containing the var-length data corresponding to the
/// var-length columns from fixed_len_pages_. In intermediate runs, the var-len data
/// is always stored in the same order as the fixed-length tuples. In initial runs,
/// the var-len data is initially in unsorted order, but is reshuffled into sorted
/// order in UnpinAllPages(). A run can have no var len pages if there are no var len
/// slots or if all the var len data is empty or NULL.
/// var_len_pages_[i] is closed iff it has been transferred or deleted.
vector<Page> var_len_pages_;
/// For initial unsorted runs, an extra pinned page is needed to reorder var-len data
/// into fixed order in UnpinAllPages(). 'var_len_copy_page_' stores this extra
/// page. Deleted in UnpinAllPages().
/// TODO: in case of in-memory runs, this could be deleted earlier to free up memory.
Page var_len_copy_page_;
/// Number of tuples added so far to this run.
int64_t num_tuples_;
/// Number of tuples returned via GetNext(), maintained for debug purposes.
int64_t num_tuples_returned_;
/// Used to implement GetNextBatch() interface required for the merger.
boost::scoped_ptr<RowBatch> buffered_batch_;
/// Members used when a run is read in GetNext().
/// The index into 'fixed_' and 'var_len_pages_' of the pages being read in GetNext().
int fixed_len_pages_index_;
int var_len_pages_index_;
/// If true, the last call to GetNext() reached the end of the previous fixed or
/// var-len page. The next call to GetNext() must increment 'fixed_len_pages_index_'
/// or 'var_len_pages_index_'. It must also pin the next page if the run is unpinned.
bool end_of_fixed_len_page_;
bool end_of_var_len_page_;
/// Offset into the current fixed length data page being processed.
int fixed_len_page_offset_;
};
/// Helper class used to iterate over tuples in a run during sorting.
class Sorter::TupleIterator {
public:
/// Creates an iterator pointing at the tuple with the given 'index' in the 'run'.
/// The index can be in the range [0, run->num_tuples()]. If it is equal to
/// run->num_tuples(), the iterator points to one past the end of the run, so
/// invoking Prev() will cause the iterator to point at the last tuple in the run.
/// 'run' must be finalized.
TupleIterator(Sorter::Run* run, int64_t index);
/// Default constructor used for local variable. Produces invalid iterator that must
/// be assigned before use.
TupleIterator() : index_(-1), tuple_(nullptr), buffer_start_index_(-1),
buffer_end_index_(-1), page_index_(-1) { }
/// Create an iterator pointing to the first tuple in the run.
static TupleIterator Begin(Sorter::Run* run) { return {run, 0}; }
/// Create an iterator pointing one past the end of the run.
static TupleIterator End(Sorter::Run* run) { return {run, run->num_tuples()}; }
/// Increments 'index_' and sets 'tuple_' to point to the next tuple in the run.
/// Increments 'page_index_' and advances to the next page if the next tuple is in
/// the next page. Can be advanced one past the last tuple in the run, but is not
/// valid to dereference 'tuple_' in that case. 'run' and 'tuple_size' are passed as
/// arguments to avoid redundantly storing the same values in multiple iterators in
/// perf-critical algorithms.
void Next(Sorter::Run* run, int tuple_size);
/// The reverse of Next(). Can advance one before the first tuple in the run, but it
/// is invalid to dereference 'tuple_' in that case.
void Prev(Sorter::Run* run, int tuple_size);
int64_t index() const { return index_; }
Tuple* tuple() const { return reinterpret_cast<Tuple*>(tuple_); }
/// Returns current tuple in TupleRow format. The caller should not modify the row.
const TupleRow* row() const {
return reinterpret_cast<const TupleRow*>(&tuple_);
}
private:
/// Move to the next page in the run (or do nothing if at end of run).
/// This is the slow path for Next();
void NextPage(Sorter::Run* run);
/// Move to the previous page in the run (or do nothing if at beginning of run).
/// This is the slow path for Prev();
void PrevPage(Sorter::Run* run);
/// Index of the current tuple in the run.
/// Can be -1 or run->num_rows() if Next() or Prev() moves iterator outside of run.
int64_t index_;
/// Pointer to the current tuple.
/// Will be an invalid pointer outside of current buffer if Next() or Prev() moves
/// iterator outside of run.
uint8_t* tuple_;
/// Indices of start and end tuples of page at page_index_. I.e. the current page
/// has tuples with indices in range [buffer_start_index_, buffer_end_index).
int64_t buffer_start_index_;
int64_t buffer_end_index_;
/// Index into fixed_len_pages_ of the page containing the current tuple.
/// If index_ is negative or past end of run, will point to the first or last page
/// in run respectively.
int page_index_;
};
/// Sorts a sequence of tuples from a run in place using a provided tuple comparator.
/// Quick sort is used for sequences of tuples larger that 16 elements, and insertion
/// sort is used for smaller sequences. The TupleSorter is initialized with a
/// RuntimeState instance to check for cancellation during an in-memory sort.
class Sorter::TupleSorter {
public:
TupleSorter(Sorter* parent, const TupleRowComparator& comparator,
int tuple_size, RuntimeState* state);
~TupleSorter();
/// Performs a quicksort for tuples in 'run' followed by an insertion sort to
/// finish smaller ranges. Only valid to call if this is an initial run that has not
/// yet been sorted. Returns an error status if any error is encountered or if the
/// query is cancelled.
Status Sort(Run* run);
private:
static const int INSERTION_THRESHOLD = 16;
Sorter* const parent_;
/// Size of the tuples in memory.
const int tuple_size_;
/// Tuple comparator with method Less() that returns true if lhs < rhs.
const TupleRowComparator& comparator_;
/// Number of times comparator_.Less() can be invoked again before
/// comparator_. expr_results_pool_.Clear() needs to be called.
int num_comparisons_till_free_;
/// Runtime state instance to check for cancellation. Not owned.
RuntimeState* const state_;
/// The run to be sorted.
Run* run_;
/// Temporarily allocated space to copy and swap tuples (Both are used in
/// Partition()). Owned by this TupleSorter instance.
uint8_t* temp_tuple_buffer_;
uint8_t* swap_buffer_;
/// Random number generator used to randomly choose pivots. We need a RNG that
/// can generate 64-bit ints. Quality of randomness doesn't need to be especially
/// high: Mersenne Twister should be more than adequate.
std::mt19937_64 rng_;
/// Wrapper around comparator_.Less(). Also call expr_results_pool_.Clear()
/// on every 'state_->batch_size()' invocations of comparator_.Less(). Returns true
/// if 'lhs' is less than 'rhs'.
bool Less(const TupleRow* lhs, const TupleRow* rhs);
/// Perform an insertion sort for rows in the range [begin, end) in a run.
/// Only valid to call for ranges of size at least 1.
Status InsertionSort(const TupleIterator& begin, const TupleIterator& end);
/// Partitions the sequence of tuples in the range [begin, end) in a run into two
/// groups around the pivot tuple - i.e. tuples in first group are <= the pivot, and
/// tuples in the second group are >= pivot. Tuples are swapped in place to create the
/// groups and the index to the first element in the second group is returned in
/// 'cut'. Return an error status if any error is encountered or if the query is
/// cancelled.
Status Partition(TupleIterator begin, TupleIterator end,
const Tuple* pivot, TupleIterator* cut);
/// Performs a quicksort of rows in the range [begin, end) followed by insertion sort
/// for smaller groups of elements. Return an error status for any errors or if the
/// query is cancelled.
Status SortHelper(TupleIterator begin, TupleIterator end);
/// Select a pivot to partition [begin, end).
Tuple* SelectPivot(TupleIterator begin, TupleIterator end);
/// Return median of three tuples according to the sort comparator.
Tuple* MedianOfThree(Tuple* t1, Tuple* t2, Tuple* t3);
/// Swaps tuples pointed to by left and right using 'swap_tuple'.
static void Swap(Tuple* left, Tuple* right, Tuple* swap_tuple, int tuple_size);
};
} // namespace impala