| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #pragma once |
| |
| #include <butil/macros.h> |
| #include <fmt/format.h> |
| #include <gen_cpp/olap_file.pb.h> |
| #include <gen_cpp/types.pb.h> |
| #include <stddef.h> |
| #include <stdint.h> |
| |
| #include <atomic> |
| #include <memory> |
| #include <mutex> |
| #include <ostream> |
| #include <string> |
| #include <vector> |
| |
| #include "common/logging.h" |
| #include "common/status.h" |
| #include "olap/metadata_adder.h" |
| #include "olap/olap_common.h" |
| #include "olap/rowset/rowset_meta.h" |
| #include "olap/tablet_schema.h" |
| |
| namespace doris { |
| |
| class Rowset; |
| |
| namespace io { |
| class RemoteFileSystem; |
| } // namespace io |
| |
| using RowsetSharedPtr = std::shared_ptr<Rowset>; |
| class RowsetReader; |
| |
| // the rowset state transfer graph: |
| // ROWSET_UNLOADED <--| |
| // ↓ | |
| // ROWSET_LOADED | |
| // ↓ | |
| // ROWSET_UNLOADING -->| |
| enum RowsetState { |
| // state for new created rowset |
| ROWSET_UNLOADED, |
| // state after load() called |
| ROWSET_LOADED, |
| // state for closed() called but owned by some readers |
| ROWSET_UNLOADING |
| }; |
| |
| class RowsetStateMachine { |
| public: |
| RowsetStateMachine() : _rowset_state(ROWSET_UNLOADED) {} |
| |
| Status on_load() { |
| switch (_rowset_state) { |
| case ROWSET_UNLOADED: |
| _rowset_state = ROWSET_LOADED; |
| break; |
| |
| default: |
| return Status::Error<ErrorCode::ROWSET_INVALID_STATE_TRANSITION>( |
| "RowsetStateMachine meet invalid state"); |
| } |
| return Status::OK(); |
| } |
| |
| Status on_close(uint64_t refs_by_reader) { |
| switch (_rowset_state) { |
| case ROWSET_LOADED: |
| if (refs_by_reader == 0) { |
| _rowset_state = ROWSET_UNLOADED; |
| } else { |
| _rowset_state = ROWSET_UNLOADING; |
| } |
| break; |
| |
| default: |
| return Status::Error<ErrorCode::ROWSET_INVALID_STATE_TRANSITION>( |
| "RowsetStateMachine meet invalid state"); |
| } |
| return Status::OK(); |
| } |
| |
| Status on_release() { |
| switch (_rowset_state) { |
| case ROWSET_UNLOADING: |
| _rowset_state = ROWSET_UNLOADED; |
| break; |
| |
| default: |
| return Status::Error<ErrorCode::ROWSET_INVALID_STATE_TRANSITION>( |
| "RowsetStateMachine meet invalid state"); |
| } |
| return Status::OK(); |
| } |
| |
| RowsetState rowset_state() { return _rowset_state; } |
| |
| private: |
| RowsetState _rowset_state; |
| }; |
| |
| class Rowset : public std::enable_shared_from_this<Rowset>, public MetadataAdder<Rowset> { |
| public: |
| // Open all segment files in this rowset and load necessary metadata. |
| // - `use_cache` : whether to use fd cache, only applicable to alpha rowset now |
| // |
| // May be called multiple times, subsequent calls will no-op. |
| // Derived class implements the load logic by overriding the `do_load_once()` method. |
| Status load(bool use_cache = true); |
| |
| // returns Status::Error<ErrorCode::ROWSET_CREATE_READER>() when failed to create reader |
| virtual Status create_reader(std::shared_ptr<RowsetReader>* result) = 0; |
| |
| const RowsetMetaSharedPtr& rowset_meta() const { return _rowset_meta; } |
| |
| void merge_rowset_meta(const RowsetMeta& other); |
| |
| bool is_pending() const { return _is_pending; } |
| |
| bool is_local() const { return _rowset_meta->is_local(); } |
| |
| const std::string& tablet_path() const { return _tablet_path; } |
| |
| // publish rowset to make it visible to read |
| void make_visible(Version version); |
| void set_version(Version version); |
| const TabletSchemaSPtr& tablet_schema() const { return _schema; } |
| |
| // helper class to access RowsetMeta |
| int64_t start_version() const { return rowset_meta()->version().first; } |
| int64_t end_version() const { return rowset_meta()->version().second; } |
| int64_t index_disk_size() const { return rowset_meta()->index_disk_size(); } |
| int64_t data_disk_size() const { return rowset_meta()->data_disk_size(); } |
| int64_t total_disk_size() const { return rowset_meta()->total_disk_size(); } |
| bool empty() const { return rowset_meta()->empty(); } |
| bool zero_num_rows() const { return rowset_meta()->num_rows() == 0; } |
| size_t num_rows() const { return rowset_meta()->num_rows(); } |
| Version version() const { return rowset_meta()->version(); } |
| RowsetId rowset_id() const { return rowset_meta()->rowset_id(); } |
| int64_t creation_time() const { return rowset_meta()->creation_time(); } |
| PUniqueId load_id() const { return rowset_meta()->load_id(); } |
| int64_t txn_id() const { return rowset_meta()->txn_id(); } |
| int64_t partition_id() const { return rowset_meta()->partition_id(); } |
| // flag for push delete rowset |
| bool delete_flag() const { return rowset_meta()->delete_flag(); } |
| int64_t num_segments() const { return rowset_meta()->num_segments(); } |
| void to_rowset_pb(RowsetMetaPB* rs_meta) const { return rowset_meta()->to_rowset_pb(rs_meta); } |
| RowsetMetaPB get_rowset_pb() const { return rowset_meta()->get_rowset_pb(); } |
| // The writing time of the newest data in rowset, to measure the freshness of a rowset. |
| int64_t newest_write_timestamp() const { return rowset_meta()->newest_write_timestamp(); } |
| bool is_segments_overlapping() const { return rowset_meta()->is_segments_overlapping(); } |
| KeysType keys_type() { return _schema->keys_type(); } |
| RowsetStatePB rowset_meta_state() const { return rowset_meta()->rowset_state(); } |
| bool produced_by_compaction() const { return rowset_meta()->produced_by_compaction(); } |
| |
| // remove all files in this rowset |
| // TODO should we rename the method to remove_files() to be more specific? |
| virtual Status remove() = 0; |
| |
| // close to clear the resource owned by rowset |
| // including: open files, indexes and so on |
| // NOTICE: can not call this function in multithreads |
| void close() { |
| RowsetState old_state = _rowset_state_machine.rowset_state(); |
| if (old_state != ROWSET_LOADED) { |
| return; |
| } |
| Status st = Status::OK(); |
| { |
| std::lock_guard close_lock(_lock); |
| uint64_t current_refs = _refs_by_reader; |
| old_state = _rowset_state_machine.rowset_state(); |
| if (old_state != ROWSET_LOADED) { |
| return; |
| } |
| if (current_refs == 0) { |
| do_close(); |
| } |
| st = _rowset_state_machine.on_close(current_refs); |
| } |
| if (!st.ok()) { |
| LOG(WARNING) << "state transition failed from:" << _rowset_state_machine.rowset_state(); |
| return; |
| } |
| VLOG_NOTICE << "rowset is close. rowset state from:" << old_state << " to " |
| << _rowset_state_machine.rowset_state() << ", version:" << start_version() |
| << "-" << end_version() << ", tabletid:" << _rowset_meta->tablet_id(); |
| } |
| |
| // hard link all files in this rowset to `dir` to form a new rowset with id `new_rowset_id`. |
| virtual Status link_files_to(const std::string& dir, RowsetId new_rowset_id, |
| size_t new_rowset_start_seg_id = 0, |
| std::set<int64_t>* without_index_uids = nullptr) = 0; |
| |
| virtual Status get_inverted_index_size(int64_t* index_size) = 0; |
| |
| // copy all files to `dir` |
| virtual Status copy_files_to(const std::string& dir, const RowsetId& new_rowset_id) = 0; |
| |
| virtual Status upload_to(const StorageResource& dest_fs, const RowsetId& new_rowset_id) = 0; |
| |
| virtual Status remove_old_files(std::vector<std::string>* files_to_remove) = 0; |
| |
| virtual Status check_file_exist() = 0; |
| |
| bool need_delete_file() const { return _need_delete_file; } |
| |
| void set_need_delete_file() { _need_delete_file = true; } |
| |
| bool contains_version(Version version) const { |
| return rowset_meta()->version().contains(version); |
| } |
| |
| static bool comparator(const RowsetSharedPtr& left, const RowsetSharedPtr& right) { |
| return left->end_version() < right->end_version(); |
| } |
| |
| // this function is called by reader to increase reference of rowset |
| void acquire() { ++_refs_by_reader; } |
| |
| void release() { |
| // if the refs by reader is 0 and the rowset is closed, should release the resouce |
| uint64_t current_refs = --_refs_by_reader; |
| if (current_refs == 0 && _rowset_state_machine.rowset_state() == ROWSET_UNLOADING) { |
| { |
| std::lock_guard release_lock(_lock); |
| // rejudge _refs_by_reader because we do not add lock in create reader |
| if (_refs_by_reader == 0 && |
| _rowset_state_machine.rowset_state() == ROWSET_UNLOADING) { |
| // first do close, then change state |
| do_close(); |
| static_cast<void>(_rowset_state_machine.on_release()); |
| } |
| } |
| if (_rowset_state_machine.rowset_state() == ROWSET_UNLOADED) { |
| VLOG_NOTICE |
| << "close the rowset. rowset state from ROWSET_UNLOADING to ROWSET_UNLOADED" |
| << ", version:" << start_version() << "-" << end_version() |
| << ", tabletid:" << _rowset_meta->tablet_id(); |
| } |
| } |
| } |
| |
| void update_delayed_expired_timestamp(uint64_t delayed_expired_timestamp) { |
| if (delayed_expired_timestamp > _delayed_expired_timestamp) { |
| _delayed_expired_timestamp = delayed_expired_timestamp; |
| } |
| } |
| |
| uint64_t delayed_expired_timestamp() { return _delayed_expired_timestamp; } |
| |
| virtual Status get_segments_key_bounds(std::vector<KeyBoundsPB>* segments_key_bounds) { |
| _rowset_meta->get_segments_key_bounds(segments_key_bounds); |
| return Status::OK(); |
| } |
| |
| // min key of the first segment |
| bool first_key(std::string* min_key) { |
| KeyBoundsPB key_bounds; |
| bool ret = _rowset_meta->get_first_segment_key_bound(&key_bounds); |
| if (!ret) { |
| return false; |
| } |
| *min_key = key_bounds.min_key(); |
| return true; |
| } |
| |
| // max key of the last segment |
| bool last_key(std::string* max_key) { |
| KeyBoundsPB key_bounds; |
| bool ret = _rowset_meta->get_last_segment_key_bound(&key_bounds); |
| if (!ret) { |
| return false; |
| } |
| *max_key = key_bounds.max_key(); |
| return true; |
| } |
| |
| bool is_segments_key_bounds_truncated() const { |
| return _rowset_meta->is_segments_key_bounds_truncated(); |
| } |
| |
| bool check_rowset_segment(); |
| |
| [[nodiscard]] virtual Status add_to_binlog() { return Status::OK(); } |
| |
| // is skip index compaction this time |
| bool is_skip_index_compaction(int32_t column_id) const { |
| return skip_index_compaction.find(column_id) != skip_index_compaction.end(); |
| } |
| |
| // set skip index compaction next time |
| void set_skip_index_compaction(int32_t column_id) { skip_index_compaction.insert(column_id); } |
| |
| std::string get_rowset_info_str(); |
| |
| void clear_cache(); |
| |
| Result<std::string> segment_path(int64_t seg_id); |
| |
| std::vector<std::string> get_index_file_names(); |
| |
| protected: |
| friend class RowsetFactory; |
| |
| DISALLOW_COPY_AND_ASSIGN(Rowset); |
| // this is non-public because all clients should use RowsetFactory to obtain pointer to initialized Rowset |
| Rowset(const TabletSchemaSPtr& schema, RowsetMetaSharedPtr rowset_meta, |
| std::string tablet_path); |
| |
| // this is non-public because all clients should use RowsetFactory to obtain pointer to initialized Rowset |
| virtual Status init() = 0; |
| |
| // release resources in this api |
| virtual void do_close() = 0; |
| |
| virtual Status check_current_rowset_segment() = 0; |
| |
| virtual void clear_inverted_index_cache() = 0; |
| |
| TabletSchemaSPtr _schema; |
| |
| RowsetMetaSharedPtr _rowset_meta; |
| |
| // Local rowset requires a tablet path to obtain the absolute path on the local fs |
| std::string _tablet_path; |
| |
| // init in constructor |
| bool _is_pending; // rowset is pending iff it's not in visible state |
| bool _is_cumulative; // rowset is cumulative iff it's visible and start version < end version |
| |
| // mutex lock for load/close api because it is costly |
| std::mutex _lock; |
| bool _need_delete_file = false; |
| // variable to indicate how many rowset readers owned this rowset |
| std::atomic<uint64_t> _refs_by_reader; |
| // rowset state machine |
| RowsetStateMachine _rowset_state_machine; |
| std::atomic<uint64_t> _delayed_expired_timestamp = 0; |
| |
| // <column_uniq_id>, skip index compaction |
| std::set<int32_t> skip_index_compaction; |
| }; |
| |
| // `rs_metas` MUST already be sorted by `RowsetMeta::comparator` |
| Status check_version_continuity(const std::vector<RowsetSharedPtr>& rowsets); |
| |
| } // namespace doris |