blob: aaf2ad5637bd55f652fa2a3803484249dc0f8e89 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <gen_cpp/Metrics_types.h>
#include <stddef.h>
#include <stdint.h>
#include <cstddef>
#include <deque>
#include <memory>
#include <queue>
#include <vector>
#include "common/status.h"
#include "core/block/block.h"
#include "core/field.h"
#include "exec/common/util.hpp"
#include "exec/sort/hybrid_sorter.h"
#include "exec/sort/sort_cursor.h"
#include "exec/sort/sort_description.h"
#include "exec/sort/vsort_exec_exprs.h"
#include "exec/sort/vsorted_run_merger.h"
#include "runtime/runtime_profile.h"
#include "runtime/runtime_state.h"
namespace doris {
#include "common/compile_check_begin.h"
class ObjectPool;
class RowDescriptor;
} // namespace doris
namespace doris {
using MergeSorterQueue = SortingQueueBatch<MergeSortCursor>;
// TODO: now we only use merge sort
class MergeSorterState {
ENABLE_FACTORY_CREATOR(MergeSorterState);
public:
MergeSorterState(const RowDescriptor& row_desc, int64_t offset)
// create_empty_block should ignore invalid slots, unsorted_block
// should be same structure with arrival block from child node
// since block from child node may ignored these slots
: _unsorted_block(Block::create_unique(VectorizedUtils::create_empty_block(row_desc))),
_offset(offset) {}
~MergeSorterState() = default;
void add_sorted_block(std::shared_ptr<Block> block);
Status build_merge_tree(const SortDescription& sort_description);
Status merge_sort_read(doris::Block* block, int batch_size, bool* eos);
size_t data_size() const {
size_t size = _unsorted_block->bytes();
return size + _in_mem_sorted_bocks_size;
}
uint64_t num_rows() const { return _num_rows; }
std::shared_ptr<Block> last_sorted_block() { return _sorted_blocks.back(); }
std::vector<std::shared_ptr<Block>>& get_sorted_block() { return _sorted_blocks; }
MergeSorterQueue& get_queue() { return _queue; }
void reset();
std::unique_ptr<Block>& unsorted_block() { return _unsorted_block; }
void ignore_offset() { _offset = 0; }
private:
void _merge_sort_read_impl(int batch_size, doris::Block* block, bool* eos);
std::unique_ptr<Block> _unsorted_block;
MergeSorterQueue _queue;
std::vector<std::shared_ptr<Block>> _sorted_blocks;
size_t _in_mem_sorted_bocks_size = 0;
uint64_t _num_rows = 0;
size_t _offset;
Block _merge_sorted_block;
std::unique_ptr<VSortedRunMerger> _merger;
};
class Sorter {
public:
Sorter(VSortExecExprs& vsort_exec_exprs, RuntimeState* state, int64_t limit, int64_t offset,
ObjectPool* pool, std::vector<bool>& is_asc_order, std::vector<bool>& nulls_first)
: _vsort_exec_exprs(vsort_exec_exprs),
_limit(limit),
_offset(offset),
_pool(pool),
_is_asc_order(is_asc_order),
_nulls_first(nulls_first),
_materialize_sort_exprs(vsort_exec_exprs.need_materialize_tuple()),
_hybrid_sorter(state->enable_use_hybrid_sort()) {}
#ifdef BE_TEST
VSortExecExprs mock_vsort_exec_exprs;
std::vector<bool> mock_is_asc_order;
std::vector<bool> mock_nulls_first;
Sorter()
: _vsort_exec_exprs(mock_vsort_exec_exprs),
_is_asc_order(mock_is_asc_order),
_nulls_first(mock_nulls_first) {}
SortDescription& get_mutable_sort_description() { return _sort_description; }
const VSortExecExprs& get_vsort_exec_exprs() const { return _vsort_exec_exprs; }
#endif
virtual ~Sorter() = default;
virtual void init_profile(RuntimeProfile* runtime_profile) {
_partial_sort_timer = ADD_TIMER(runtime_profile, "PartialSortTime");
_merge_block_timer = ADD_TIMER(runtime_profile, "MergeBlockTime");
_partial_sort_counter = ADD_COUNTER(runtime_profile, "PartialSortCounter", TUnit::UNIT);
}
virtual Status append_block(Block* block) = 0;
virtual Status prepare_for_read(bool is_spill) = 0;
virtual Status get_next(RuntimeState* state, Block* block, bool* eos) = 0;
virtual size_t data_size() const = 0;
virtual size_t get_reserve_mem_size(RuntimeState* state, bool eos) const { return 0; }
// for topn runtime predicate
const SortDescription& get_sort_description() const { return _sort_description; }
virtual Field get_top_value() { return Field {PrimitiveType::TYPE_NULL}; }
virtual Status merge_sort_read_for_spill(RuntimeState* state, doris::Block* block,
int batch_size, bool* eos);
virtual void reset() {}
int64_t limit() const { return _limit; }
int64_t offset() const { return _offset; }
void set_enable_spill() { _enable_spill = true; }
protected:
Status partial_sort(Block& src_block, Block& dest_block, bool reversed = false);
Status _prepare_sort_columns(Block& src_block, Block& dest_block, bool reversed = false);
bool _enable_spill = false;
SortDescription _sort_description;
VSortExecExprs& _vsort_exec_exprs;
int64_t _limit;
int64_t _offset;
ObjectPool* _pool = nullptr;
std::vector<bool>& _is_asc_order;
std::vector<bool>& _nulls_first;
RuntimeProfile::Counter* _partial_sort_timer = nullptr;
RuntimeProfile::Counter* _merge_block_timer = nullptr;
RuntimeProfile::Counter* _partial_sort_counter = nullptr;
std::priority_queue<MergeSortBlockCursor> _block_priority_queue;
bool _materialize_sort_exprs;
HybridSorter _hybrid_sorter;
};
class FullSorter final : public Sorter {
ENABLE_FACTORY_CREATOR(FullSorter);
public:
FullSorter(VSortExecExprs& vsort_exec_exprs, int64_t limit, int64_t offset, ObjectPool* pool,
std::vector<bool>& is_asc_order, std::vector<bool>& nulls_first,
const RowDescriptor& row_desc, RuntimeState* state, RuntimeProfile* profile);
~FullSorter() override = default;
Status append_block(Block* block) override;
Status prepare_for_read(bool is_spill) override;
Status get_next(RuntimeState* state, Block* block, bool* eos) override;
size_t data_size() const override;
size_t get_reserve_mem_size(RuntimeState* state, bool eos) const override;
Status merge_sort_read_for_spill(RuntimeState* state, doris::Block* block, int batch_size,
bool* eos) override;
void reset() override;
void set_max_buffered_block_bytes(size_t max_buffered_block_bytes) {
_max_buffered_block_bytes = max_buffered_block_bytes;
}
auto merge_sort_state() { return _state.get(); }
Status do_sort();
private:
bool _reach_limit() {
return _state->unsorted_block()->allocated_bytes() >= _max_buffered_block_bytes;
}
bool has_enough_capacity(Block* input_block, Block* unsorted_block) const;
std::unique_ptr<MergeSorterState> _state;
static constexpr size_t INITIAL_BUFFERED_BLOCK_BYTES = 64 * 1024 * 1024;
static constexpr size_t SPILL_BUFFERED_BLOCK_SIZE = 4 * 1024 * 1024;
static constexpr size_t SPILL_BUFFERED_BLOCK_BYTES = 256 << 20;
size_t _buffered_block_size = SPILL_BUFFERED_BLOCK_SIZE;
size_t _buffered_block_bytes = SPILL_BUFFERED_BLOCK_BYTES;
size_t _max_buffered_block_bytes = INITIAL_BUFFERED_BLOCK_BYTES;
};
#include "common/compile_check_end.h"
} // namespace doris