blob: cd07c68578cf41fa55c3d9030d10a1c2207e77e3 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/sort/sorter.h"
#include <glog/logging.h>
#include <algorithm>
#include <cstddef>
#include <functional>
#include <ostream>
#include <string>
#include <utility>
#include "common/object_pool.h"
#include "core/block/block.h"
#include "core/block/column_with_type_and_name.h"
#include "core/column/column.h"
#include "core/column/column_nullable.h"
#include "core/data_type/data_type.h"
#include "core/data_type/data_type_nullable.h"
#include "exec/common/util.hpp"
#include "exec/sort/sort_block.h"
#include "exprs/vexpr_context.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_profile.h"
#include "runtime/thread_context.h"
namespace doris {
class RowDescriptor;
} // namespace doris
namespace doris {
// When doing spillable sorting, each sorted block is spilled into a single file.
//
// In order to decrease memory pressure when merging
// multiple spilled blocks into one bigger sorted block, only part
// of each spilled blocks are read back into memory at a time.
//
// Currently the spilled blocks are splitted into small sub blocks,
// each sub block is serialized in PBlock format and appended
// to the spill file.
//
void MergeSorterState::reset() {
std::vector<std::shared_ptr<MergeSortCursorImpl>> empty_cursors(0);
std::vector<std::shared_ptr<Block>> empty_blocks(0);
_sorted_blocks.swap(empty_blocks);
unsorted_block() = Block::create_unique(unsorted_block()->clone_empty());
_in_mem_sorted_bocks_size = 0;
}
void MergeSorterState::add_sorted_block(std::shared_ptr<Block> block) {
auto rows = block->rows();
if (0 == rows) {
return;
}
_in_mem_sorted_bocks_size += block->bytes();
_sorted_blocks.emplace_back(block);
_num_rows += rows;
}
Status MergeSorterState::build_merge_tree(const SortDescription& sort_description) {
std::vector<MergeSortCursor> cursors;
for (auto& block : _sorted_blocks) {
cursors.emplace_back(
MergeSortCursorImpl::create_shared(std::move(block), sort_description));
}
_queue = MergeSorterQueue(cursors);
_sorted_blocks.clear();
return Status::OK();
}
Status MergeSorterState::merge_sort_read(doris::Block* block, int batch_size, bool* eos) {
DCHECK(_sorted_blocks.empty());
DCHECK(unsorted_block()->empty());
_merge_sort_read_impl(batch_size, block, eos);
return Status::OK();
}
void MergeSorterState::_merge_sort_read_impl(int batch_size, doris::Block* block, bool* eos) {
size_t num_columns = unsorted_block()->columns();
MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block(block, *unsorted_block());
MutableColumns& merged_columns = m_block.mutable_columns();
/// Take rows from queue in right order and push to 'merged'.
size_t merged_rows = 0;
// process single element queue on merge_sort_read()
while (_queue.is_valid() && merged_rows < batch_size) {
auto [current, current_rows] = _queue.current();
current_rows = std::min(current_rows, batch_size - merged_rows);
size_t step = std::min(_offset, current_rows);
_offset -= step;
current_rows -= step;
if (current_rows) {
for (size_t i = 0; i < num_columns; ++i) {
merged_columns[i]->insert_range_from(*current->impl->columns[i],
current->impl->pos + step, current_rows);
}
merged_rows += current_rows;
}
if (!current->impl->is_last(current_rows + step)) {
_queue.next(current_rows + step);
} else {
_queue.remove_top();
}
}
block->set_columns(std::move(merged_columns));
*eos = merged_rows == 0;
}
Status Sorter::merge_sort_read_for_spill(RuntimeState* state, doris::Block* block, int batch_size,
bool* eos) {
return get_next(state, block, eos);
}
Status Sorter::partial_sort(Block& src_block, Block& dest_block, bool reversed) {
size_t num_cols = src_block.columns();
RETURN_IF_ERROR(_prepare_sort_columns(src_block, dest_block, reversed));
{
SCOPED_TIMER(_partial_sort_timer);
uint64_t limit = reversed ? 0 : (_offset + _limit);
sort_block(_materialize_sort_exprs ? dest_block : src_block, dest_block, _sort_description,
_hybrid_sorter, limit);
}
src_block.clear_column_data(num_cols);
return Status::OK();
}
Status Sorter::_prepare_sort_columns(Block& src_block, Block& dest_block, bool reversed) {
if (_materialize_sort_exprs) {
auto output_tuple_expr_ctxs = _vsort_exec_exprs.sort_tuple_slot_expr_ctxs();
ColumnsWithTypeAndName columns_data(output_tuple_expr_ctxs.size());
for (int i = 0; i < output_tuple_expr_ctxs.size(); ++i) {
RETURN_IF_ERROR(output_tuple_expr_ctxs[i]->execute(&src_block, columns_data[i]));
}
Block new_block {columns_data};
dest_block.swap(new_block);
}
_sort_description.resize(_vsort_exec_exprs.ordering_expr_ctxs().size());
Block* result_block = _materialize_sort_exprs ? &dest_block : &src_block;
for (int i = 0; i < _sort_description.size(); i++) {
const auto& ordering_expr = _vsort_exec_exprs.ordering_expr_ctxs()[i];
RETURN_IF_ERROR(ordering_expr->execute(result_block, &_sort_description[i].column_number));
_sort_description[i].direction = _is_asc_order[i] ? 1 : -1;
_sort_description[i].nulls_direction =
_nulls_first[i] ? -_sort_description[i].direction : _sort_description[i].direction;
if (reversed) {
_sort_description[i].direction *= -1;
}
}
return Status::OK();
}
FullSorter::FullSorter(VSortExecExprs& vsort_exec_exprs, int64_t limit, int64_t offset,
ObjectPool* pool, std::vector<bool>& is_asc_order,
std::vector<bool>& nulls_first, const RowDescriptor& row_desc,
RuntimeState* state, RuntimeProfile* profile)
: Sorter(vsort_exec_exprs, state, limit, offset, pool, is_asc_order, nulls_first),
_state(MergeSorterState::create_unique(row_desc, offset)) {}
// check whether the unsorted block can hold more data from input block and no need to alloc new memory
bool FullSorter::has_enough_capacity(Block* input_block, Block* unsorted_block) const {
DCHECK_EQ(input_block->columns(), unsorted_block->columns());
for (auto i = 0; i < input_block->columns(); ++i) {
if (!unsorted_block->get_by_position(i).column->has_enough_capacity(
*input_block->get_by_position(i).column)) {
return false;
}
}
return true;
}
size_t FullSorter::get_reserve_mem_size(RuntimeState* state, bool eos) const {
size_t size_to_reserve = 0;
const auto rows = _state->unsorted_block()->rows();
if (rows != 0) {
const auto bytes = _state->unsorted_block()->bytes();
const auto allocated_bytes = _state->unsorted_block()->allocated_bytes();
const auto bytes_per_row = bytes / rows;
const auto estimated_size_of_next_block = bytes_per_row * state->batch_size();
auto new_block_bytes = estimated_size_of_next_block + bytes;
auto new_rows = rows + state->batch_size();
// If the new size is greater than 85% of allocalted bytes, it maybe need to realloc.
if ((new_block_bytes * 100 / allocated_bytes) >= 85) {
size_to_reserve += (size_t)(allocated_bytes * 1.15);
}
auto sort = new_rows > _buffered_block_size || new_block_bytes > _buffered_block_bytes;
if (sort) {
// new column is created when doing sort, reserve average size of one column
// for estimation
size_to_reserve += new_block_bytes / _state->unsorted_block()->columns();
// helping data structures used during sorting
size_to_reserve += new_rows * sizeof(IColumn::Permutation::value_type);
auto sort_columns_count = _vsort_exec_exprs.ordering_expr_ctxs().size();
if (1 != sort_columns_count) {
size_to_reserve += new_rows * sizeof(EqualRangeIterator);
}
}
}
return size_to_reserve;
}
Status FullSorter::append_block(Block* block) {
DCHECK(block->rows() > 0);
// iff have reach limit and the unsorted block capacity can't hold the block data size
if (_reach_limit() && !has_enough_capacity(block, _state->unsorted_block().get())) {
RETURN_IF_ERROR(do_sort());
}
{
SCOPED_TIMER(_merge_block_timer);
const auto& data = _state->unsorted_block()->get_columns_with_type_and_name();
const auto& arrival_data = block->get_columns_with_type_and_name();
auto sz = block->rows();
for (int i = 0; i < data.size(); ++i) {
DCHECK(data[i].type->equals(*(arrival_data[i].type)))
<< " type1: " << data[i].type->get_name()
<< " type2: " << arrival_data[i].type->get_name() << " i: " << i;
if (is_column_const(*arrival_data[i].column)) {
data[i].column->assume_mutable()->insert_many_from(
assert_cast<const ColumnConst*>(arrival_data[i].column.get())
->get_data_column(),
0, sz);
} else {
data[i].column->assume_mutable()->insert_range_from(*arrival_data[i].column, 0, sz);
}
}
block->clear_column_data();
}
return Status::OK();
}
Status FullSorter::prepare_for_read(bool is_spill) {
if (is_spill) {
_limit += _offset;
_offset = 0;
_state->ignore_offset();
}
if (_state->unsorted_block()->rows() > 0) {
RETURN_IF_ERROR(do_sort());
}
return _state->build_merge_tree(_sort_description);
}
Status FullSorter::get_next(RuntimeState* state, Block* block, bool* eos) {
return _state->merge_sort_read(block, state->batch_size(), eos);
}
Status FullSorter::merge_sort_read_for_spill(RuntimeState* state, doris::Block* block,
int batch_size, bool* eos) {
return _state->merge_sort_read(block, batch_size, eos);
}
Status FullSorter::do_sort() {
Block* src_block = _state->unsorted_block().get();
Block desc_block = src_block->clone_without_columns();
COUNTER_UPDATE(_partial_sort_counter, 1);
RETURN_IF_ERROR(partial_sort(*src_block, desc_block));
// dispose TOP-N logic
if (_limit != -1 && !_enable_spill) {
// Here is a little opt to reduce the mem usage, we build a max heap
// to order the block in _block_priority_queue.
// if one block totally greater the heap top of _block_priority_queue
// we can throw the block data directly.
if (_state->num_rows() < _offset + _limit) {
_state->add_sorted_block(Block::create_shared(std::move(desc_block)));
_block_priority_queue.emplace(MergeSortCursorImpl::create_shared(
_state->last_sorted_block(), _sort_description));
} else {
auto tmp_cursor_impl = MergeSortCursorImpl::create_shared(
Block::create_shared(std::move(desc_block)), _sort_description);
MergeSortBlockCursor block_cursor(tmp_cursor_impl);
if (!block_cursor.totally_greater(_block_priority_queue.top())) {
_state->add_sorted_block(tmp_cursor_impl->block);
_block_priority_queue.emplace(MergeSortCursorImpl::create_shared(
_state->last_sorted_block(), _sort_description));
}
}
} else {
// dispose normal sort logic
_state->add_sorted_block(Block::create_shared(std::move(desc_block)));
}
return Status::OK();
}
size_t FullSorter::data_size() const {
return _state->data_size();
}
void FullSorter::reset() {
_state->reset();
}
} // namespace doris