blob: 61bbd0303fd128131b9d48b1a4006e01aec85815 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// This file is copied from
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Core/Block.h
// and modified by Doris
#pragma once
#include <glog/logging.h>
#include <parallel_hashmap/phmap.h>
#include <cstddef>
#include <cstdint>
#include <initializer_list>
#include <list>
#include <memory>
#include <ostream>
#include <set>
#include <string>
#include <utility>
#include <vector>
#include "common/be_mock_util.h"
#include "common/exception.h"
#include "common/factory_creator.h"
#include "common/status.h"
#include "vec/columns/column.h"
#include "vec/columns/column_nullable.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/core/columns_with_type_and_name.h"
#include "vec/core/types.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_nullable.h"
class SipHash;
namespace doris {
class TupleDescriptor;
class PBlock;
class SlotDescriptor;
namespace segment_v2 {
enum CompressionTypePB : int;
} // namespace segment_v2
namespace vectorized {
/** Container for set of columns for bunch of rows in memory.
* This is unit of data processing.
* Also contains metadata - data types of columns and their names
* (either original names from a table, or generated names during temporary calculations).
* Allows to insert, remove columns in arbitrary position, to change order of columns.
*/
class MutableBlock;
class Block {
ENABLE_FACTORY_CREATOR(Block);
private:
using Container = ColumnsWithTypeAndName;
Container data;
public:
Block() = default;
Block(std::initializer_list<ColumnWithTypeAndName> il);
Block(ColumnsWithTypeAndName data_);
Block(const std::vector<SlotDescriptor*>& slots, size_t block_size);
Block(const std::vector<SlotDescriptor>& slots, size_t block_size);
MOCK_FUNCTION ~Block() = default;
Block(const Block& block) = default;
Block& operator=(const Block& p) = default;
Block(Block&& block) = default;
Block& operator=(Block&& other) = default;
void reserve(size_t count);
// Make sure the nammes is useless when use block
void clear_names();
/// insert the column at the specified position
void insert(size_t position, const ColumnWithTypeAndName& elem);
void insert(size_t position, ColumnWithTypeAndName&& elem);
/// insert the column to the end
void insert(const ColumnWithTypeAndName& elem);
void insert(ColumnWithTypeAndName&& elem);
/// remove the column at the specified position
void erase(size_t position);
/// remove the column at the [start, end)
void erase_tail(size_t start);
/// remove the columns at the specified positions
void erase(const std::set<size_t>& positions);
// T was std::set<int>, std::vector<int>, std::list<int>
template <class T>
void erase_not_in(const T& container) {
Container new_data;
for (auto pos : container) {
new_data.emplace_back(std::move(data[pos]));
}
std::swap(data, new_data);
}
std::unordered_map<std::string, uint32_t> get_name_to_pos_map() const {
std::unordered_map<std::string, uint32_t> name_to_index_map;
for (uint32_t i = 0; i < data.size(); ++i) {
name_to_index_map[data[i].name] = i;
}
return name_to_index_map;
}
/// References are invalidated after calling functions above.
ColumnWithTypeAndName& get_by_position(size_t position) {
DCHECK(data.size() > position)
<< ", data.size()=" << data.size() << ", position=" << position;
return data[position];
}
const ColumnWithTypeAndName& get_by_position(size_t position) const { return data[position]; }
void replace_by_position(size_t position, ColumnPtr&& res) {
this->get_by_position(position).column = std::move(res);
}
void replace_by_position(size_t position, const ColumnPtr& res) {
this->get_by_position(position).column = res;
}
void replace_by_position_if_const(size_t position) {
auto& element = this->get_by_position(position);
element.column = element.column->convert_to_full_column_if_const();
}
ColumnWithTypeAndName& safe_get_by_position(size_t position);
const ColumnWithTypeAndName& safe_get_by_position(size_t position) const;
Container::iterator begin() { return data.begin(); }
Container::iterator end() { return data.end(); }
Container::const_iterator begin() const { return data.begin(); }
Container::const_iterator end() const { return data.end(); }
Container::const_iterator cbegin() const { return data.cbegin(); }
Container::const_iterator cend() const { return data.cend(); }
// Get position of column by name. Returns -1 if there is no column with that name.
// ATTN: this method is O(N). better maintain name -> position map in caller if you need to call it frequently.
int get_position_by_name(const std::string& name) const;
const ColumnsWithTypeAndName& get_columns_with_type_and_name() const;
std::vector<std::string> get_names() const;
DataTypes get_data_types() const;
DataTypePtr get_data_type(size_t index) const {
CHECK(index < data.size());
return data[index].type;
}
/// Returns number of rows from first column in block, not equal to nullptr. If no columns, returns 0.
size_t rows() const;
// Cut the rows in block, use in LIMIT operation
void set_num_rows(size_t length);
// Skip the rows in block, use in OFFSET, LIMIT operation
void skip_num_rows(int64_t& offset);
/// As the assumption we used around, the number of columns won't exceed int16 range. so no need to worry when we
/// assign it to int32.
uint32_t columns() const { return static_cast<uint32_t>(data.size()); }
/// Checks that every column in block is not nullptr and has same number of elements.
void check_number_of_rows(bool allow_null_columns = false) const;
Status check_type_and_column() const;
/// Approximate number of bytes in memory - for profiling and limits.
size_t bytes() const;
std::string columns_bytes() const;
/// Approximate number of allocated bytes in memory - for profiling and limits.
MOCK_FUNCTION size_t allocated_bytes() const;
/** Get a list of column names separated by commas. */
std::string dump_names() const;
std::string dump_types() const;
/** List of names, types and lengths of columns. Designed for debugging. */
std::string dump_structure() const;
/** Get the same block, but empty. */
Block clone_empty() const;
Columns get_columns() const;
Columns get_columns_and_convert();
Block clone_without_columns(const std::vector<int>* column_offset = nullptr) const;
/** Get empty columns with the same types as in block. */
MutableColumns clone_empty_columns() const;
/** Get columns from block for mutation. Columns in block will be nullptr. */
MutableColumns mutate_columns();
/** Replace columns in a block */
void set_columns(MutableColumns&& columns);
Block clone_with_columns(MutableColumns&& columns) const;
void clear();
void swap(Block& other) noexcept;
void swap(Block&& other) noexcept;
// Shuffle columns in place based on the result_column_ids
void shuffle_columns(const std::vector<int>& result_column_ids);
// Default column size = -1 means clear all column in block
// Else clear column [0, column_size) delete column [column_size, data.size)
void clear_column_data(int64_t column_size = -1) noexcept;
MOCK_FUNCTION bool mem_reuse() { return !data.empty(); }
bool is_empty_column() { return data.empty(); }
bool empty() const { return rows() == 0; }
/**
* Updates SipHash of the Block, using update method of columns.
* Returns hash for block, that could be used to differentiate blocks
* with same structure, but different data.
*/
void update_hash(SipHash& hash) const;
/**
* Get block data in string.
* If code is in default_implementation_for_nulls or something likely, type and column's nullity could
* temporarily be not same. set allow_null_mismatch to true to dump it correctly.
*/
std::string dump_data(size_t begin = 0, size_t row_limit = 100,
bool allow_null_mismatch = false) const;
std::string dump_data_json(size_t begin = 0, size_t row_limit = 100,
bool allow_null_mismatch = false) const;
/** Get one line data from block, only use in load data */
std::string dump_one_line(size_t row, int column_end) const;
Status append_to_block_by_selector(MutableBlock* dst, const IColumn::Selector& selector) const;
// need exception safety
static void filter_block_internal(Block* block, const std::vector<uint32_t>& columns_to_filter,
const IColumn::Filter& filter);
// need exception safety
static void filter_block_internal(Block* block, const IColumn::Filter& filter,
uint32_t column_to_keep);
// need exception safety
static void filter_block_internal(Block* block, const IColumn::Filter& filter);
static Status filter_block(Block* block, const std::vector<uint32_t>& columns_to_filter,
size_t filter_column_id, size_t column_to_keep);
static Status filter_block(Block* block, size_t filter_column_id, size_t column_to_keep);
static void erase_useless_column(Block* block, size_t column_to_keep) {
block->erase_tail(column_to_keep);
}
// serialize block to PBlock
Status serialize(int be_exec_version, PBlock* pblock, size_t* uncompressed_bytes,
size_t* compressed_bytes, int64_t* compress_time,
segment_v2::CompressionTypePB compression_type,
bool allow_transfer_large_data = false) const;
Status deserialize(const PBlock& pblock, size_t* uncompressed_bytes, int64_t* decompress_time);
std::unique_ptr<Block> create_same_struct_block(size_t size, bool is_reserve = false) const;
/** Compares (*this) n-th row and rhs m-th row.
* Returns negative number, 0, or positive number (*this) n-th row is less, equal, greater than rhs m-th row respectively.
* Is used in sortings.
*
* If one of element's value is NaN or NULLs, then:
* - if nan_direction_hint == -1, NaN and NULLs are considered as least than everything other;
* - if nan_direction_hint == 1, NaN and NULLs are considered as greatest than everything other.
* For example, if nan_direction_hint == -1 is used by descending sorting, NaNs will be at the end.
*
* For non Nullable and non floating point types, nan_direction_hint is ignored.
*/
int compare_at(size_t n, size_t m, const Block& rhs, int nan_direction_hint) const {
DCHECK_EQ(columns(), rhs.columns());
return compare_at(n, m, columns(), rhs, nan_direction_hint);
}
int compare_at(size_t n, size_t m, size_t num_columns, const Block& rhs,
int nan_direction_hint) const {
DCHECK_GE(columns(), num_columns);
DCHECK_GE(rhs.columns(), num_columns);
DCHECK_LE(n, rows());
DCHECK_LE(m, rhs.rows());
for (size_t i = 0; i < num_columns; ++i) {
DCHECK(get_by_position(i).type->equals(*rhs.get_by_position(i).type));
auto res = get_by_position(i).column->compare_at(n, m, *(rhs.get_by_position(i).column),
nan_direction_hint);
if (res) {
return res;
}
}
return 0;
}
int compare_at(size_t n, size_t m, const std::vector<uint32_t>* compare_columns,
const Block& rhs, int nan_direction_hint) const {
DCHECK_GE(columns(), compare_columns->size());
DCHECK_GE(rhs.columns(), compare_columns->size());
DCHECK_LE(n, rows());
DCHECK_LE(m, rhs.rows());
for (auto i : *compare_columns) {
DCHECK(get_by_position(i).type->equals(*rhs.get_by_position(i).type));
auto res = get_by_position(i).column->compare_at(n, m, *(rhs.get_by_position(i).column),
nan_direction_hint);
if (res) {
return res;
}
}
return 0;
}
//note(wb) no DCHECK here, because this method is only used after compare_at now, so no need to repeat check here.
// If this method is used in more places, you can add DCHECK case by case.
int compare_column_at(size_t n, size_t m, size_t col_idx, const Block& rhs,
int nan_direction_hint) const {
auto res = get_by_position(col_idx).column->compare_at(
n, m, *(rhs.get_by_position(col_idx).column), nan_direction_hint);
return res;
}
// for String type or Array<String> type
void shrink_char_type_column_suffix_zero(const std::vector<size_t>& char_type_idx);
void clear_column_mem_not_keep(const std::vector<bool>& column_keep_flags,
bool need_keep_first);
private:
void erase_impl(size_t position);
};
using Blocks = std::vector<Block>;
using BlocksList = std::list<Block>;
using BlocksPtr = std::shared_ptr<Blocks>;
using BlocksPtrs = std::shared_ptr<std::vector<BlocksPtr>>;
class MutableBlock {
ENABLE_FACTORY_CREATOR(MutableBlock);
private:
MutableColumns _columns;
DataTypes _data_types;
std::vector<std::string> _names;
public:
static MutableBlock build_mutable_block(Block* block) {
return block == nullptr ? MutableBlock() : MutableBlock(block);
}
MutableBlock() = default;
~MutableBlock() = default;
MutableBlock(Block* block)
: _columns(block->mutate_columns()),
_data_types(block->get_data_types()),
_names(block->get_names()) {}
MutableBlock(Block&& block)
: _columns(block.mutate_columns()),
_data_types(block.get_data_types()),
_names(block.get_names()) {}
void operator=(MutableBlock&& m_block) {
_columns = std::move(m_block._columns);
_data_types = std::move(m_block._data_types);
_names = std::move(m_block._names);
}
size_t rows() const;
size_t columns() const { return _columns.size(); }
bool empty() const { return rows() == 0; }
MutableColumns& mutable_columns() { return _columns; }
void set_mutable_columns(MutableColumns&& columns) { _columns = std::move(columns); }
DataTypes& data_types() { return _data_types; }
MutableColumnPtr& get_column_by_position(size_t position) { return _columns[position]; }
const MutableColumnPtr& get_column_by_position(size_t position) const {
return _columns[position];
}
DataTypePtr& get_datatype_by_position(size_t position) { return _data_types[position]; }
const DataTypePtr& get_datatype_by_position(size_t position) const {
return _data_types[position];
}
int compare_one_column(size_t n, size_t m, size_t column_id, int nan_direction_hint) const {
DCHECK_LE(column_id, columns());
DCHECK_LE(n, rows());
DCHECK_LE(m, rows());
auto& column = get_column_by_position(column_id);
return column->compare_at(n, m, *column, nan_direction_hint);
}
int compare_at(size_t n, size_t m, size_t num_columns, const MutableBlock& rhs,
int nan_direction_hint) const {
DCHECK_GE(columns(), num_columns);
DCHECK_GE(rhs.columns(), num_columns);
DCHECK_LE(n, rows());
DCHECK_LE(m, rhs.rows());
for (size_t i = 0; i < num_columns; ++i) {
DCHECK(get_datatype_by_position(i)->equals(*rhs.get_datatype_by_position(i)));
auto res = get_column_by_position(i)->compare_at(n, m, *(rhs.get_column_by_position(i)),
nan_direction_hint);
if (res) {
return res;
}
}
return 0;
}
int compare_at(size_t n, size_t m, const std::vector<uint32_t>* compare_columns,
const MutableBlock& rhs, int nan_direction_hint) const {
DCHECK_GE(columns(), compare_columns->size());
DCHECK_GE(rhs.columns(), compare_columns->size());
DCHECK_LE(n, rows());
DCHECK_LE(m, rhs.rows());
for (auto i : *compare_columns) {
DCHECK(get_datatype_by_position(i)->equals(*rhs.get_datatype_by_position(i)));
auto res = get_column_by_position(i)->compare_at(n, m, *(rhs.get_column_by_position(i)),
nan_direction_hint);
if (res) {
return res;
}
}
return 0;
}
std::string dump_types() const {
std::string res;
for (auto type : _data_types) {
if (!res.empty()) {
res += ", ";
}
res += type->get_name();
}
return res;
}
template <typename T>
[[nodiscard]] Status merge(T&& block) {
RETURN_IF_CATCH_EXCEPTION(return merge_impl(block););
}
template <typename T>
[[nodiscard]] Status merge_ignore_overflow(T&& block) {
RETURN_IF_CATCH_EXCEPTION(return merge_impl_ignore_overflow(block););
}
// only use for join. call ignore_overflow to prevent from throw exception in join
template <typename T>
[[nodiscard]] Status merge_impl_ignore_overflow(T&& block) {
if (_columns.size() != block.columns()) {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
"Merge block not match, self column count: {}, [columns: {}, types: {}], "
"input column count: {}, [columns: {}, "
"types: {}], ",
_columns.size(), dump_names(), dump_types(), block.columns(),
block.dump_names(), block.dump_types());
}
for (int i = 0; i < _columns.size(); ++i) {
if (!_data_types[i]->equals(*block.get_by_position(i).type)) {
throw doris::Exception(doris::ErrorCode::FATAL_ERROR,
"Merge block not match, self:[columns: {}, types: {}], "
"input:[columns: {}, types: {}], ",
dump_names(), dump_types(), block.dump_names(),
block.dump_types());
}
_columns[i]->insert_range_from_ignore_overflow(
*block.get_by_position(i).column->convert_to_full_column_if_const().get(), 0,
block.rows());
}
return Status::OK();
}
template <typename T>
[[nodiscard]] Status merge_impl(T&& block) {
// merge is not supported in dynamic block
if (_columns.empty() && _data_types.empty()) {
_data_types = block.get_data_types();
_names = block.get_names();
_columns.resize(block.columns());
for (size_t i = 0; i < block.columns(); ++i) {
if (block.get_by_position(i).column) {
_columns[i] = (*std::move(block.get_by_position(i)
.column->convert_to_full_column_if_const()))
.mutate();
} else {
_columns[i] = _data_types[i]->create_column();
}
}
} else {
if (_columns.size() != block.columns()) {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
"Merge block not match, self column count: {}, [columns: {}, types: {}], "
"input column count: {}, [columns: {}, "
"types: {}], ",
_columns.size(), dump_names(), dump_types(), block.columns(),
block.dump_names(), block.dump_types());
}
for (int i = 0; i < _columns.size(); ++i) {
if (!_data_types[i]->equals(*block.get_by_position(i).type)) {
DCHECK(_data_types[i]->is_nullable())
<< " target type: " << _data_types[i]->get_name()
<< " src type: " << block.get_by_position(i).type->get_name();
DCHECK(((DataTypeNullable*)_data_types[i].get())
->get_nested_type()
->equals(*block.get_by_position(i).type));
DCHECK(!block.get_by_position(i).type->is_nullable());
_columns[i]->insert_range_from(*make_nullable(block.get_by_position(i).column)
->convert_to_full_column_if_const(),
0, block.rows());
} else {
_columns[i]->insert_range_from(
*block.get_by_position(i)
.column->convert_to_full_column_if_const()
.get(),
0, block.rows());
}
}
}
return Status::OK();
}
// move to columns' data to a Block. this will invalidate
Block to_block(int start_column = 0);
Block to_block(int start_column, int end_column);
void swap(MutableBlock& other) noexcept;
void add_row(const Block* block, int row);
// Batch add row should return error status if allocate memory failed.
Status add_rows(const Block* block, const uint32_t* row_begin, const uint32_t* row_end,
const std::vector<int>* column_offset = nullptr);
Status add_rows(const Block* block, size_t row_begin, size_t length);
Status add_rows(const Block* block, const std::vector<int64_t>& rows);
std::string dump_data(size_t row_limit = 100) const;
std::string dump_data_json(size_t row_limit = 100) const;
void clear() {
_columns.clear();
_data_types.clear();
_names.clear();
}
// columns resist. columns' inner data removed.
void clear_column_data() noexcept;
size_t allocated_bytes() const;
size_t bytes() const {
size_t res = 0;
for (const auto& elem : _columns) {
res += elem->byte_size();
}
return res;
}
std::vector<std::string>& get_names() { return _names; }
/** Get a list of column names separated by commas. */
std::string dump_names() const;
};
struct IteratorRowRef {
std::shared_ptr<Block> block;
int row_pos;
bool is_same;
template <typename T>
int compare(const IteratorRowRef& rhs, const T& compare_arguments) const {
return block->compare_at(row_pos, rhs.row_pos, compare_arguments, *rhs.block, -1);
}
void reset() {
block = nullptr;
row_pos = -1;
is_same = false;
}
};
using BlockView = std::vector<IteratorRowRef>;
using BlockUPtr = std::unique_ptr<Block>;
} // namespace vectorized
} // namespace doris