blob: 20960b723099a5684a6d4e2a7145ad717ec4eaa7 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <deque>
#include "runtime/buffered_block_mgr2.h"
#include "util/tuple_row_compare.h"
namespace doris {
class SortedRunMerger;
class RuntimeProfile;
class RowBatch;
// SpillSorter contains the external sort implementation. Its purpose is to sort arbitrarily
// large input data sets with a fixed memory budget by spilling data to disk if
// necessary. BufferedBlockMgr2 is used to allocate and manage blocks of data to be
// sorted.
//
// The client API for SpillSorter is as follows:
// add_batch() is used to add input rows to be sorted. Multiple tuples in an input row are
// materialized into a row with a single tuple (the sort tuple) using the materialization
// exprs in _sort_tuple_slot_expr_ctxs. The sort tuples are sorted according to the sort
// parameters and output by the sorter.
// add_batch() can be called multiple times.
//
// input_done() is called to indicate the end of input. If multiple sorted runs were
// created, it triggers intermediate merge steps (if necessary) and creates the final
// merger that returns results via get_next().
//
// get_next() is used to retrieve sorted rows. It can be called multiple times.
// add_batch(), input_done() and get_next() must be called in that order.
//
// Batches of input rows are collected into a sequence of pinned BufferedBlockMgr2 blocks
// called a run. The maximum size of a run is determined by the maximum available buffers
// in the block manager. After the run is full, it is sorted in memory, unpinned and the
// next run is collected. The variable-length column data (e.g. string slots) in the
// materialized sort tuples are stored in separate sequence of blocks from the tuples
// themselves.
// When the blocks containing tuples in a run are unpinned, the var-len slot pointers are
// converted to offsets from the start of the first var-len data block. When a block is
// read back, these offsets are converted back to pointers.
// The in-memory sorter sorts the fixed-length tuples in-place. The output rows have the
// same schema as the materialized sort tuples.
//
// After the input is consumed, the sorter is left with one or more sorted runs. The
// client calls get_next(output_batch) to retrieve batches of sorted rows. If there are
// multiple runs, the runs are merged using SortedRunMerger to produce a stream of sorted
// tuples. At least one block per run (two if there are var-length slots) must be pinned
// in memory during a merge, so multiple merges may be necessary if the number of runs is
// too large. During a merge, rows from multiple sorted input runs are compared and copied
// into a single larger run. One input batch is created to hold tuple rows for each
// input run, and one batch is created to hold deep copied rows (i.e. ptrs + data) from
// the output of the merge.
//
// If there is a single sorted run (i.e. no merge required), only tuple rows are
// copied into the output batch supplied by get_next, and the data itself is left in
// pinned blocks held by the sorter.
//
// Note that init() must be called right after the constructor.
//
// During a merge, one row batch is created for each input run, and one batch is created
// for the output of the merge (if is not the final merge). It is assumed that the memory
// for these batches have already been accounted for in the memory budget for the sort.
// That is, the memory for these batches does not come out of the block buffer manager.
//
// TODO: Not necessary to actually copy var-len data - instead take ownership of the
// var-length data in the input batch. Copying can be deferred until a run is unpinned.
// TODO: When the first run is constructed, create a sequence of pointers to materialized
// tuples. If the input fits in memory, the pointers can be sorted instead of sorting the
// tuples in place.
class SpillSorter {
public:
// sort_tuple_slot_exprs are the slot exprs used to materialize the tuple to be sorted.
// compare_less_than is a comparator for the sort tuples (returns true if lhs < rhs).
// _merge_batch_size is the size of the batches created to provide rows to the merger
// and retrieve rows from an intermediate merger.
SpillSorter(const TupleRowComparator& compare_less_than,
const std::vector<ExprContext*>& sort_tuple_slot_expr_ctxs,
RowDescriptor* output_row_desc, RuntimeProfile* profile, RuntimeState* state);
~SpillSorter();
// Initialization code, including registration to the block_mgr and the initialization
// of the _unsorted_run, both of these may fail.
Status init();
// Adds a batch of input rows to the current unsorted run.
Status add_batch(RowBatch* batch);
// Called to indicate there is no more input. Triggers the creation of merger(s) if
// necessary.
Status input_done();
// Get the next batch of sorted output rows from the sorter.
Status get_next(RowBatch* batch, bool* eos);
// Resets all internal state like ExecNode::reset().
// init() must have been called, add_batch()/get_next()/input_done()
// may or may not have been called.
Status reset();
bool is_spilled() { return _spilled; }
// Estimate the memory overhead in bytes for an intermediate merge, based on the
// maximum number of memory buffers available for the sort, the row descriptor for
// the sorted tuples and the batch size used (in rows).
// This is a pessimistic estimate of the memory needed by the sorter in addition to the
// memory used by the block buffer manager. The memory overhead is 0 if the input fits
// in memory. Merges incur additional memory overhead because row batches are created
// to hold tuple rows from the input runs, and the merger itself deep-copies
// sort-merged rows into its output batch.
static uint64_t estimate_merge_mem(uint64_t available_blocks, RowDescriptor* row_desc,
int merge_batch_size);
private:
class Run;
class TupleSorter;
// Create a SortedRunMerger from the first 'num_runs' sorted runs in _sorted_runs and
// assign it to _merger. The runs to be merged are removed from _sorted_runs.
// The SpillSorter sets the deep_copy_input flag to true for the merger, since the blocks
// containing input run data will be unpinned as input runs are read.
Status create_merger(int num_runs);
// Repeatedly replaces multiple smaller runs in _sorted_runs with a single larger
// merged run until the number of remaining runs is small enough for a single merge.
// At least 1 (2 if var-len slots) block from each sorted run must be pinned for
// a merge. If the number of sorted runs is too large, merge sets of smaller runs
// into large runs until a final merge can be performed. An intermediate row batch
// containing deep copied rows is used for the output of each intermediate merge.
Status merge_intermediate_runs();
// Sorts _unsorted_run and appends it to the list of sorted runs. Deletes any empty
// blocks at the end of the run. Updates the sort bytes counter if necessary.
Status sort_run();
// Runtime state instance used to check for cancellation. Not owned.
RuntimeState* const _state;
// In memory sorter and less-than comparator.
TupleRowComparator _compare_less_than;
std::unique_ptr<TupleSorter> _in_mem_tuple_sorter;
// Block manager object used to allocate, pin and release runs. Not owned by SpillSorter.
BufferedBlockMgr2* _block_mgr;
// Handle to block mgr to make allocations from.
BufferedBlockMgr2::Client* _block_mgr_client;
// True if the tuples to be sorted have var-length slots.
bool _has_var_len_slots;
// Expressions used to materialize the sort tuple. Contains one expr per slot in the tuple.
std::vector<ExprContext*> _sort_tuple_slot_expr_ctxs;
// Descriptor for the sort tuple. Input rows are materialized into 1 tuple before
// sorting. Not owned by the SpillSorter.
RowDescriptor* _output_row_desc;
/////////////////////////////////////////
// BEGIN: Members that must be reset()
// The current unsorted run that is being collected. Is sorted and added to
// _sorted_runs after it is full (i.e. number of blocks allocated == max available
// buffers) or after the input is complete. Owned and placed in _obj_pool.
// When it is added to _sorted_runs, it is set to nullptr.
Run* _unsorted_run;
// List of sorted runs that have been produced but not merged. _unsorted_run is added
// to this list after an in-memory sort. Sorted runs produced by intermediate merges
// are also added to this list. Runs are added to the object pool.
std::deque<Run*> _sorted_runs;
// Merger object (intermediate or final) currently used to produce sorted runs.
// Only one merge is performed at a time. Will never be used if the input fits in
// memory.
std::unique_ptr<SortedRunMerger> _merger;
// Runs that are currently processed by the _merge.
// These runs can be deleted when we are done with the current merge.
std::deque<Run*> _merging_runs;
// Pool of owned Run objects. Maintains Runs objects across non-freeing reset() calls.
ObjectPool _obj_pool;
// END: Members that must be reset()
/////////////////////////////////////////
// Runtime profile and counters for this sorter instance.
RuntimeProfile* _profile;
RuntimeProfile::Counter* _initial_runs_counter;
RuntimeProfile::Counter* _num_merges_counter;
RuntimeProfile::Counter* _in_mem_sort_timer;
RuntimeProfile::Counter* _sorted_data_size;
bool _spilled;
};
} // namespace doris