| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #pragma once |
| |
| #include <butil/fast_rand.h> |
| #include <gen_cpp/Descriptors_types.h> |
| #include <gen_cpp/descriptors.pb.h> |
| #include <gen_cpp/olap_file.pb.h> |
| |
| #include <cstdint> |
| #include <functional> |
| #include <iterator> |
| #include <map> |
| #include <memory> |
| #include <string> |
| #include <tuple> |
| #include <unordered_map> |
| #include <utility> |
| #include <vector> |
| |
| #include "common/logging.h" |
| #include "common/object_pool.h" |
| #include "common/status.h" |
| #include "runtime/descriptors.h" |
| #include "runtime/raw_value.h" |
| #include "vec/columns/column.h" |
| #include "vec/core/block.h" |
| #include "vec/core/column_with_type_and_name.h" |
| #include "vec/exprs/vexpr.h" |
| #include "vec/exprs/vexpr_fwd.h" |
| |
| namespace doris { |
| class MemTracker; |
| class SlotDescriptor; |
| class TExprNode; |
| class TabletColumn; |
| class TabletIndex; |
| class TupleDescriptor; |
| |
| struct OlapTableIndexSchema { |
| int64_t index_id; |
| std::vector<SlotDescriptor*> slots; |
| int32_t schema_hash; |
| std::vector<TabletColumn*> columns; |
| std::vector<TabletIndex*> indexes; |
| vectorized::VExprContextSPtr where_clause; |
| |
| void to_protobuf(POlapTableIndexSchema* pindex) const; |
| }; |
| |
| class OlapTableSchemaParam { |
| public: |
| OlapTableSchemaParam() = default; |
| ~OlapTableSchemaParam() noexcept = default; |
| |
| Status init(const TOlapTableSchemaParam& tschema); |
| Status init(const POlapTableSchemaParam& pschema); |
| |
| int64_t db_id() const { return _db_id; } |
| int64_t table_id() const { return _table_id; } |
| int64_t version() const { return _version; } |
| |
| TupleDescriptor* tuple_desc() const { return _tuple_desc; } |
| const std::vector<OlapTableIndexSchema*>& indexes() const { return _indexes; } |
| |
| void to_protobuf(POlapTableSchemaParam* pschema) const; |
| |
| // NOTE: this function is not thread-safe. |
| POlapTableSchemaParam* to_protobuf() const { |
| if (_proto_schema == nullptr) { |
| _proto_schema = _obj_pool.add(new POlapTableSchemaParam()); |
| to_protobuf(_proto_schema); |
| } |
| return _proto_schema; |
| } |
| |
| UniqueKeyUpdateModePB unique_key_update_mode() const { return _unique_key_update_mode; } |
| |
| bool is_partial_update() const { |
| return _unique_key_update_mode != UniqueKeyUpdateModePB::UPSERT; |
| } |
| bool is_fixed_partial_update() const { |
| return _unique_key_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS; |
| } |
| bool is_flexible_partial_update() const { |
| return _unique_key_update_mode == UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS; |
| } |
| |
| std::set<std::string> partial_update_input_columns() const { |
| return _partial_update_input_columns; |
| } |
| PartialUpdateNewRowPolicyPB partial_update_new_key_policy() const { |
| return _partial_update_new_row_policy; |
| } |
| std::string auto_increment_coulumn() const { return _auto_increment_column; } |
| int32_t auto_increment_column_unique_id() const { return _auto_increment_column_unique_id; } |
| void set_timestamp_ms(int64_t timestamp_ms) { _timestamp_ms = timestamp_ms; } |
| int64_t timestamp_ms() const { return _timestamp_ms; } |
| void set_nano_seconds(int32_t nano_seconds) { _nano_seconds = nano_seconds; } |
| int32_t nano_seconds() const { return _nano_seconds; } |
| void set_timezone(std::string timezone) { _timezone = timezone; } |
| std::string timezone() const { return _timezone; } |
| bool is_strict_mode() const { return _is_strict_mode; } |
| int32_t sequence_map_col_uid() const { return _sequence_map_col_uid; } |
| std::string debug_string() const; |
| |
| Status init_unique_key_update_mode(const TOlapTableSchemaParam& tschema); |
| |
| private: |
| int64_t _db_id; |
| int64_t _table_id; |
| int64_t _version; |
| |
| TupleDescriptor* _tuple_desc = nullptr; |
| mutable POlapTableSchemaParam* _proto_schema = nullptr; |
| std::vector<OlapTableIndexSchema*> _indexes; |
| mutable ObjectPool _obj_pool; |
| UniqueKeyUpdateModePB _unique_key_update_mode {UniqueKeyUpdateModePB::UPSERT}; |
| PartialUpdateNewRowPolicyPB _partial_update_new_row_policy { |
| PartialUpdateNewRowPolicyPB::APPEND}; |
| std::set<std::string> _partial_update_input_columns; |
| bool _is_strict_mode = false; |
| std::string _auto_increment_column; |
| int32_t _auto_increment_column_unique_id; |
| int64_t _timestamp_ms = 0; |
| int32_t _nano_seconds {0}; |
| std::string _timezone; |
| int32_t _sequence_map_col_uid {-1}; |
| }; |
| |
| using OlapTableIndexTablets = TOlapTableIndexTablets; |
| // struct TOlapTableIndexTablets { |
| // 1: required i64 index_id |
| // 2: required list<i64> tablets |
| // } |
| |
| using BlockRow = std::pair<vectorized::Block*, int32_t>; |
| using BlockRowWithIndicator = |
| std::tuple<vectorized::Block*, int32_t, bool>; // [block, row, is_transformed] |
| |
| struct VOlapTablePartition { |
| int64_t id = 0; |
| BlockRow start_key; |
| BlockRow end_key; |
| std::vector<BlockRow> in_keys; |
| int64_t num_buckets = 0; |
| std::vector<OlapTableIndexTablets> indexes; |
| bool is_mutable; |
| // -1 indicates partition with hash distribution |
| int64_t load_tablet_idx = -1; |
| int total_replica_num = 0; |
| int load_required_replica_num = 0; |
| |
| VOlapTablePartition(vectorized::Block* partition_block) |
| // the default value of partition bound is -1. |
| : start_key {partition_block, -1}, end_key {partition_block, -1} {} |
| }; |
| |
| // this is only used by tablet_sink. so we can assume it's inited by its' descriptor. |
| class VOlapTablePartKeyComparator { |
| public: |
| VOlapTablePartKeyComparator(const std::vector<uint16_t>& slot_locs, |
| const std::vector<uint16_t>& params_locs) |
| : _slot_locs(slot_locs), _param_locs(params_locs) {} |
| |
| // return true if lhs < rhs |
| // 'row' is -1 mean maximal boundary |
| bool operator()(const BlockRowWithIndicator& lhs, const BlockRowWithIndicator& rhs) const; |
| |
| private: |
| const std::vector<uint16_t>& _slot_locs; |
| const std::vector<uint16_t>& _param_locs; |
| }; |
| |
| // store an olap table's tablet information |
| class VOlapTablePartitionParam { |
| public: |
| VOlapTablePartitionParam(std::shared_ptr<OlapTableSchemaParam>& schema, |
| const TOlapTablePartitionParam& param); |
| |
| ~VOlapTablePartitionParam(); |
| |
| Status init(); |
| |
| int64_t db_id() const { return _t_param.db_id; } |
| int64_t table_id() const { return _t_param.table_id; } |
| int64_t version() const { return _t_param.version; } |
| |
| // return true if we found this block_row in partition |
| ALWAYS_INLINE bool find_partition(vectorized::Block* block, int row, |
| VOlapTablePartition*& partition) const { |
| auto it = _is_in_partition ? _partitions_map->find(std::tuple {block, row, true}) |
| : _partitions_map->upper_bound(std::tuple {block, row, true}); |
| VLOG_TRACE << "find row " << row << " of\n" |
| << block->dump_data() << "in:\n" |
| << _partition_block.dump_data() << "result line row: " << std::get<1>(it->first); |
| |
| // for list partition it might result in default partition |
| if (_is_in_partition) { |
| partition = (it != _partitions_map->end()) ? it->second : _default_partition; |
| it = _partitions_map->end(); |
| } |
| if (it != _partitions_map->end() && |
| _part_contains(it->second, std::tuple {block, row, true})) { |
| partition = it->second; |
| } |
| return (partition != nullptr); |
| } |
| |
| ALWAYS_INLINE void find_tablets( |
| vectorized::Block* block, const std::vector<uint32_t>& indexes, |
| const std::vector<VOlapTablePartition*>& partitions, |
| std::vector<uint32_t>& tablet_indexes /*result*/, |
| /*TODO: check if flat hash map will be better*/ |
| std::map<VOlapTablePartition*, int64_t>* partition_tablets_buffer = nullptr) const { |
| std::function<uint32_t(vectorized::Block*, uint32_t, const VOlapTablePartition&)> |
| compute_function; |
| if (!_distributed_slot_locs.empty()) { |
| //TODO: refactor by saving the hash values. then we can calculate in columnwise. |
| compute_function = [this](vectorized::Block* block, uint32_t row, |
| const VOlapTablePartition& partition) -> uint32_t { |
| uint32_t hash_val = 0; |
| for (unsigned short _distributed_slot_loc : _distributed_slot_locs) { |
| auto* slot_desc = _slots[_distributed_slot_loc]; |
| auto& column = block->get_by_position(_distributed_slot_loc).column; |
| auto val = column->get_data_at(row); |
| if (val.data != nullptr) { |
| hash_val = RawValue::zlib_crc32(val.data, val.size, |
| slot_desc->type()->get_primitive_type(), |
| hash_val); |
| } else { |
| hash_val = HashUtil::zlib_crc_hash_null(hash_val); |
| } |
| } |
| return hash_val % partition.num_buckets; |
| }; |
| } else { // random distribution |
| compute_function = [](vectorized::Block* block, uint32_t row, |
| const VOlapTablePartition& partition) -> uint32_t { |
| if (partition.load_tablet_idx == -1) { |
| // for compatible with old version, just do random |
| return butil::fast_rand() % partition.num_buckets; |
| } |
| return partition.load_tablet_idx % partition.num_buckets; |
| }; |
| } |
| |
| if (partition_tablets_buffer == nullptr) { |
| for (auto index : indexes) { |
| tablet_indexes[index] = compute_function(block, index, *partitions[index]); |
| } |
| } else { // use buffer |
| for (auto index : indexes) { |
| auto* partition = partitions[index]; |
| if (auto it = partition_tablets_buffer->find(partition); |
| it != partition_tablets_buffer->end()) { |
| tablet_indexes[index] = it->second; // tablet |
| } else { |
| // compute and save in buffer |
| (*partition_tablets_buffer)[partition] = tablet_indexes[index] = |
| compute_function(block, index, *partitions[index]); |
| } |
| } |
| } |
| } |
| |
| const std::vector<VOlapTablePartition*>& get_partitions() const { return _partitions; } |
| |
| // it's same with auto now because we only support transformed partition in auto partition. may expand in future |
| bool is_projection_partition() const { return _is_auto_partition; } |
| bool is_auto_partition() const { return _is_auto_partition; } |
| |
| bool is_auto_detect_overwrite() const { return _is_auto_detect_overwrite; } |
| int64_t get_overwrite_group_id() const { return _overwrite_group_id; } |
| |
| std::vector<uint16_t> get_partition_keys() const { return _partition_slot_locs; } |
| |
| Status add_partitions(const std::vector<TOlapTablePartition>& partitions); |
| // no need to del/reinsert partition keys, but change the link. reset the _partitions items |
| Status replace_partitions(std::vector<int64_t>& old_partition_ids, |
| const std::vector<TOlapTablePartition>& new_partitions); |
| |
| vectorized::VExprContextSPtrs get_part_func_ctx() { return _part_func_ctx; } |
| vectorized::VExprSPtrs get_partition_function() { return _partition_function; } |
| |
| // which will affect _partition_block |
| Status generate_partition_from(const TOlapTablePartition& t_part, |
| VOlapTablePartition*& part_result); |
| |
| void set_transformed_slots(const std::vector<uint16_t>& new_slots) { |
| _transformed_slot_locs = new_slots; |
| } |
| |
| private: |
| Status _create_partition_keys(const std::vector<TExprNode>& t_exprs, BlockRow* part_key); |
| |
| // check if this partition contain this key |
| bool _part_contains(VOlapTablePartition* part, BlockRowWithIndicator key) const; |
| |
| // this partition only valid in this schema |
| std::shared_ptr<OlapTableSchemaParam> _schema; |
| TOlapTablePartitionParam _t_param; |
| |
| const std::vector<SlotDescriptor*>& _slots; |
| std::vector<uint16_t> _partition_slot_locs; |
| std::vector<uint16_t> _transformed_slot_locs; |
| std::vector<uint16_t> _distributed_slot_locs; |
| |
| ObjectPool _obj_pool; |
| vectorized::Block _partition_block; |
| std::unique_ptr<MemTracker> _mem_tracker; |
| std::vector<VOlapTablePartition*> _partitions; |
| // For all partition value rows saved in this map, indicator is false. whenever we use a value to find in it, the param is true. |
| // so that we can distinguish which column index to use (origin slots or transformed slots). |
| // For range partition we ONLY SAVE RIGHT ENDS. when we find a part's RIGHT by a value, check if part's left cover it then. |
| std::unique_ptr< |
| std::map<BlockRowWithIndicator, VOlapTablePartition*, VOlapTablePartKeyComparator>> |
| _partitions_map; |
| |
| bool _is_in_partition = false; |
| uint32_t _mem_usage = 0; |
| // only works when using list partition, the resource is owned by _partitions |
| VOlapTablePartition* _default_partition = nullptr; |
| |
| bool _is_auto_partition = false; |
| vectorized::VExprContextSPtrs _part_func_ctx = {nullptr}; |
| vectorized::VExprSPtrs _partition_function = {nullptr}; |
| TPartitionType::type _part_type; // support list or range |
| // "insert overwrite partition(*)", detect which partitions by BE |
| bool _is_auto_detect_overwrite = false; |
| int64_t _overwrite_group_id = 0; |
| }; |
| |
| // indicate where's the tablet and all its replications (node-wise) |
| using TabletLocation = TTabletLocation; |
| // struct TTabletLocation { |
| // 1: required i64 tablet_id |
| // 2: required list<i64> node_ids |
| // } |
| |
| class OlapTableLocationParam { |
| public: |
| OlapTableLocationParam(const TOlapTableLocationParam& t_param) : _t_param(t_param) { |
| for (auto& location : _t_param.tablets) { |
| _tablets.emplace(location.tablet_id, &location); |
| } |
| } |
| |
| int64_t db_id() const { return _t_param.db_id; } |
| int64_t table_id() const { return _t_param.table_id; } |
| int64_t version() const { return _t_param.version; } |
| |
| TabletLocation* find_tablet(int64_t tablet_id) const { |
| auto it = _tablets.find(tablet_id); |
| if (it != std::end(_tablets)) { |
| return it->second; |
| } |
| return nullptr; |
| } |
| |
| void add_locations(std::vector<TTabletLocation>& locations) { |
| for (auto& location : locations) { |
| if (_tablets.find(location.tablet_id) == _tablets.end()) { |
| _tablets[location.tablet_id] = &location; |
| } |
| } |
| } |
| |
| private: |
| TOlapTableLocationParam _t_param; |
| // [tablet_id, tablet]. tablet has id, also. |
| std::unordered_map<int64_t, TabletLocation*> _tablets; |
| }; |
| |
| struct NodeInfo { |
| int64_t id; |
| int64_t option; |
| std::string host; |
| int32_t brpc_port; |
| |
| NodeInfo() = default; |
| |
| NodeInfo(const TNodeInfo& tnode) |
| : id(tnode.id), |
| option(tnode.option), |
| host(tnode.host), |
| brpc_port(tnode.async_internal_port) {} |
| }; |
| |
| class DorisNodesInfo { |
| public: |
| DorisNodesInfo() = default; |
| DorisNodesInfo(const TPaloNodesInfo& t_nodes) { |
| for (const auto& node : t_nodes.nodes) { |
| _nodes.emplace(node.id, node); |
| } |
| } |
| void setNodes(const TPaloNodesInfo& t_nodes) { |
| _nodes.clear(); |
| for (const auto& node : t_nodes.nodes) { |
| _nodes.emplace(node.id, node); |
| } |
| } |
| const NodeInfo* find_node(int64_t id) const { |
| auto it = _nodes.find(id); |
| if (it != std::end(_nodes)) { |
| return &it->second; |
| } |
| return nullptr; |
| } |
| |
| void add_nodes(const std::vector<TNodeInfo>& t_nodes) { |
| for (const auto& node : t_nodes) { |
| const auto* node_info = find_node(node.id); |
| if (node_info == nullptr) { |
| _nodes.emplace(node.id, node); |
| } |
| } |
| } |
| |
| const std::unordered_map<int64_t, NodeInfo>& nodes_info() { return _nodes; } |
| |
| private: |
| std::unordered_map<int64_t, NodeInfo> _nodes; |
| }; |
| |
| } // namespace doris |