| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "exec/sort/sorter.h" |
| |
| #include <glog/logging.h> |
| |
| #include <algorithm> |
| #include <cstddef> |
| #include <functional> |
| #include <ostream> |
| #include <string> |
| #include <utility> |
| |
| #include "common/object_pool.h" |
| #include "core/block/block.h" |
| #include "core/block/column_with_type_and_name.h" |
| #include "core/column/column.h" |
| #include "core/column/column_nullable.h" |
| #include "core/data_type/data_type.h" |
| #include "core/data_type/data_type_nullable.h" |
| #include "exec/common/util.hpp" |
| #include "exec/sort/sort_block.h" |
| #include "exprs/vexpr_context.h" |
| #include "runtime/exec_env.h" |
| #include "runtime/runtime_profile.h" |
| #include "runtime/thread_context.h" |
| |
| namespace doris { |
| class RowDescriptor; |
| } // namespace doris |
| |
| namespace doris { |
| |
| // When doing spillable sorting, each sorted block is spilled into a single file. |
| // |
| // In order to decrease memory pressure when merging |
| // multiple spilled blocks into one bigger sorted block, only part |
| // of each spilled blocks are read back into memory at a time. |
| // |
| // Currently the spilled blocks are splitted into small sub blocks, |
| // each sub block is serialized in PBlock format and appended |
| // to the spill file. |
| // |
| |
| void MergeSorterState::reset() { |
| std::vector<std::shared_ptr<MergeSortCursorImpl>> empty_cursors(0); |
| std::vector<std::shared_ptr<Block>> empty_blocks(0); |
| _sorted_blocks.swap(empty_blocks); |
| unsorted_block() = Block::create_unique(unsorted_block()->clone_empty()); |
| _in_mem_sorted_bocks_size = 0; |
| } |
| |
| void MergeSorterState::add_sorted_block(std::shared_ptr<Block> block) { |
| auto rows = block->rows(); |
| if (0 == rows) { |
| return; |
| } |
| _in_mem_sorted_bocks_size += block->bytes(); |
| _sorted_blocks.emplace_back(block); |
| _num_rows += rows; |
| } |
| |
| Status MergeSorterState::build_merge_tree(const SortDescription& sort_description) { |
| std::vector<MergeSortCursor> cursors; |
| for (auto& block : _sorted_blocks) { |
| cursors.emplace_back( |
| MergeSortCursorImpl::create_shared(std::move(block), sort_description)); |
| } |
| _queue = MergeSorterQueue(cursors); |
| |
| _sorted_blocks.clear(); |
| return Status::OK(); |
| } |
| |
| Status MergeSorterState::merge_sort_read(doris::Block* block, int batch_size, bool* eos) { |
| DCHECK(_sorted_blocks.empty()); |
| DCHECK(unsorted_block()->empty()); |
| _merge_sort_read_impl(batch_size, block, eos); |
| return Status::OK(); |
| } |
| |
| void MergeSorterState::_merge_sort_read_impl(int batch_size, doris::Block* block, bool* eos) { |
| size_t num_columns = unsorted_block()->columns(); |
| |
| MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block(block, *unsorted_block()); |
| MutableColumns& merged_columns = m_block.mutable_columns(); |
| |
| /// Take rows from queue in right order and push to 'merged'. |
| size_t merged_rows = 0; |
| // process single element queue on merge_sort_read() |
| while (_queue.is_valid() && merged_rows < batch_size) { |
| auto [current, current_rows] = _queue.current(); |
| current_rows = std::min(current_rows, batch_size - merged_rows); |
| |
| size_t step = std::min(_offset, current_rows); |
| _offset -= step; |
| current_rows -= step; |
| |
| if (current_rows) { |
| for (size_t i = 0; i < num_columns; ++i) { |
| merged_columns[i]->insert_range_from(*current->impl->columns[i], |
| current->impl->pos + step, current_rows); |
| } |
| merged_rows += current_rows; |
| } |
| |
| if (!current->impl->is_last(current_rows + step)) { |
| _queue.next(current_rows + step); |
| } else { |
| _queue.remove_top(); |
| } |
| } |
| |
| block->set_columns(std::move(merged_columns)); |
| *eos = merged_rows == 0; |
| } |
| |
| Status Sorter::merge_sort_read_for_spill(RuntimeState* state, doris::Block* block, int batch_size, |
| bool* eos) { |
| return get_next(state, block, eos); |
| } |
| |
| Status Sorter::partial_sort(Block& src_block, Block& dest_block, bool reversed) { |
| size_t num_cols = src_block.columns(); |
| RETURN_IF_ERROR(_prepare_sort_columns(src_block, dest_block, reversed)); |
| { |
| SCOPED_TIMER(_partial_sort_timer); |
| uint64_t limit = reversed ? 0 : (_offset + _limit); |
| sort_block(_materialize_sort_exprs ? dest_block : src_block, dest_block, _sort_description, |
| _hybrid_sorter, limit); |
| } |
| |
| src_block.clear_column_data(num_cols); |
| return Status::OK(); |
| } |
| |
| Status Sorter::_prepare_sort_columns(Block& src_block, Block& dest_block, bool reversed) { |
| if (_materialize_sort_exprs) { |
| auto output_tuple_expr_ctxs = _vsort_exec_exprs.sort_tuple_slot_expr_ctxs(); |
| ColumnsWithTypeAndName columns_data(output_tuple_expr_ctxs.size()); |
| for (int i = 0; i < output_tuple_expr_ctxs.size(); ++i) { |
| RETURN_IF_ERROR(output_tuple_expr_ctxs[i]->execute(&src_block, columns_data[i])); |
| } |
| |
| Block new_block {columns_data}; |
| dest_block.swap(new_block); |
| } |
| |
| _sort_description.resize(_vsort_exec_exprs.ordering_expr_ctxs().size()); |
| Block* result_block = _materialize_sort_exprs ? &dest_block : &src_block; |
| for (int i = 0; i < _sort_description.size(); i++) { |
| const auto& ordering_expr = _vsort_exec_exprs.ordering_expr_ctxs()[i]; |
| RETURN_IF_ERROR(ordering_expr->execute(result_block, &_sort_description[i].column_number)); |
| |
| _sort_description[i].direction = _is_asc_order[i] ? 1 : -1; |
| _sort_description[i].nulls_direction = |
| _nulls_first[i] ? -_sort_description[i].direction : _sort_description[i].direction; |
| if (reversed) { |
| _sort_description[i].direction *= -1; |
| } |
| } |
| return Status::OK(); |
| } |
| |
| FullSorter::FullSorter(VSortExecExprs& vsort_exec_exprs, int64_t limit, int64_t offset, |
| ObjectPool* pool, std::vector<bool>& is_asc_order, |
| std::vector<bool>& nulls_first, const RowDescriptor& row_desc, |
| RuntimeState* state, RuntimeProfile* profile) |
| : Sorter(vsort_exec_exprs, state, limit, offset, pool, is_asc_order, nulls_first), |
| _state(MergeSorterState::create_unique(row_desc, offset)) {} |
| |
| // check whether the unsorted block can hold more data from input block and no need to alloc new memory |
| bool FullSorter::has_enough_capacity(Block* input_block, Block* unsorted_block) const { |
| DCHECK_EQ(input_block->columns(), unsorted_block->columns()); |
| for (auto i = 0; i < input_block->columns(); ++i) { |
| if (!unsorted_block->get_by_position(i).column->has_enough_capacity( |
| *input_block->get_by_position(i).column)) { |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| size_t FullSorter::get_reserve_mem_size(RuntimeState* state, bool eos) const { |
| size_t size_to_reserve = 0; |
| const auto rows = _state->unsorted_block()->rows(); |
| if (rows != 0) { |
| const auto bytes = _state->unsorted_block()->bytes(); |
| const auto allocated_bytes = _state->unsorted_block()->allocated_bytes(); |
| const auto bytes_per_row = bytes / rows; |
| const auto estimated_size_of_next_block = bytes_per_row * state->batch_size(); |
| auto new_block_bytes = estimated_size_of_next_block + bytes; |
| auto new_rows = rows + state->batch_size(); |
| // If the new size is greater than 85% of allocalted bytes, it maybe need to realloc. |
| if ((new_block_bytes * 100 / allocated_bytes) >= 85) { |
| size_to_reserve += (size_t)(allocated_bytes * 1.15); |
| } |
| auto sort = new_rows > _buffered_block_size || new_block_bytes > _buffered_block_bytes; |
| if (sort) { |
| // new column is created when doing sort, reserve average size of one column |
| // for estimation |
| size_to_reserve += new_block_bytes / _state->unsorted_block()->columns(); |
| |
| // helping data structures used during sorting |
| size_to_reserve += new_rows * sizeof(IColumn::Permutation::value_type); |
| |
| auto sort_columns_count = _vsort_exec_exprs.ordering_expr_ctxs().size(); |
| if (1 != sort_columns_count) { |
| size_to_reserve += new_rows * sizeof(EqualRangeIterator); |
| } |
| } |
| } |
| return size_to_reserve; |
| } |
| |
| Status FullSorter::append_block(Block* block) { |
| DCHECK(block->rows() > 0); |
| |
| // iff have reach limit and the unsorted block capacity can't hold the block data size |
| if (_reach_limit() && !has_enough_capacity(block, _state->unsorted_block().get())) { |
| RETURN_IF_ERROR(do_sort()); |
| } |
| |
| { |
| SCOPED_TIMER(_merge_block_timer); |
| const auto& data = _state->unsorted_block()->get_columns_with_type_and_name(); |
| const auto& arrival_data = block->get_columns_with_type_and_name(); |
| auto sz = block->rows(); |
| for (int i = 0; i < data.size(); ++i) { |
| DCHECK(data[i].type->equals(*(arrival_data[i].type))) |
| << " type1: " << data[i].type->get_name() |
| << " type2: " << arrival_data[i].type->get_name() << " i: " << i; |
| if (is_column_const(*arrival_data[i].column)) { |
| data[i].column->assume_mutable()->insert_many_from( |
| assert_cast<const ColumnConst*>(arrival_data[i].column.get()) |
| ->get_data_column(), |
| 0, sz); |
| } else { |
| data[i].column->assume_mutable()->insert_range_from(*arrival_data[i].column, 0, sz); |
| } |
| } |
| block->clear_column_data(); |
| } |
| return Status::OK(); |
| } |
| |
| Status FullSorter::prepare_for_read(bool is_spill) { |
| if (is_spill) { |
| _limit += _offset; |
| _offset = 0; |
| _state->ignore_offset(); |
| } |
| if (_state->unsorted_block()->rows() > 0) { |
| RETURN_IF_ERROR(do_sort()); |
| } |
| return _state->build_merge_tree(_sort_description); |
| } |
| |
| Status FullSorter::get_next(RuntimeState* state, Block* block, bool* eos) { |
| return _state->merge_sort_read(block, state->batch_size(), eos); |
| } |
| |
| Status FullSorter::merge_sort_read_for_spill(RuntimeState* state, doris::Block* block, |
| int batch_size, bool* eos) { |
| return _state->merge_sort_read(block, batch_size, eos); |
| } |
| |
| Status FullSorter::do_sort() { |
| Block* src_block = _state->unsorted_block().get(); |
| Block desc_block = src_block->clone_without_columns(); |
| COUNTER_UPDATE(_partial_sort_counter, 1); |
| RETURN_IF_ERROR(partial_sort(*src_block, desc_block)); |
| |
| // dispose TOP-N logic |
| if (_limit != -1 && !_enable_spill) { |
| // Here is a little opt to reduce the mem usage, we build a max heap |
| // to order the block in _block_priority_queue. |
| // if one block totally greater the heap top of _block_priority_queue |
| // we can throw the block data directly. |
| if (_state->num_rows() < _offset + _limit) { |
| _state->add_sorted_block(Block::create_shared(std::move(desc_block))); |
| _block_priority_queue.emplace(MergeSortCursorImpl::create_shared( |
| _state->last_sorted_block(), _sort_description)); |
| } else { |
| auto tmp_cursor_impl = MergeSortCursorImpl::create_shared( |
| Block::create_shared(std::move(desc_block)), _sort_description); |
| MergeSortBlockCursor block_cursor(tmp_cursor_impl); |
| if (!block_cursor.totally_greater(_block_priority_queue.top())) { |
| _state->add_sorted_block(tmp_cursor_impl->block); |
| _block_priority_queue.emplace(MergeSortCursorImpl::create_shared( |
| _state->last_sorted_block(), _sort_description)); |
| } |
| } |
| } else { |
| // dispose normal sort logic |
| _state->add_sorted_block(Block::create_shared(std::move(desc_block))); |
| } |
| return Status::OK(); |
| } |
| |
| size_t FullSorter::data_size() const { |
| return _state->data_size(); |
| } |
| |
| void FullSorter::reset() { |
| _state->reset(); |
| } |
| |
| } // namespace doris |