| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "olap/tablet_manager.h" |
| |
| #include <fmt/format.h> |
| #include <gen_cpp/AgentService_types.h> |
| #include <gen_cpp/BackendService_types.h> |
| #include <gen_cpp/Descriptors_types.h> |
| #include <gen_cpp/MasterService_types.h> |
| #include <gen_cpp/Types_types.h> |
| #include <gen_cpp/olap_file.pb.h> |
| #include <re2/re2.h> |
| #include <unistd.h> |
| |
| #include <algorithm> |
| #include <list> |
| #include <mutex> |
| #include <ostream> |
| #include <string_view> |
| |
| #include "absl/strings/substitute.h" |
| #include "bvar/bvar.h" |
| #include "common/compiler_util.h" // IWYU pragma: keep |
| #include "common/config.h" |
| #include "common/logging.h" |
| #include "io/fs/local_file_system.h" |
| #include "olap/cumulative_compaction_time_series_policy.h" |
| #include "olap/data_dir.h" |
| #include "olap/olap_common.h" |
| #include "olap/olap_define.h" |
| #include "olap/olap_meta.h" |
| #include "olap/pb_helper.h" |
| #include "olap/rowset/beta_rowset.h" |
| #include "olap/rowset/rowset.h" |
| #include "olap/rowset/rowset_meta_manager.h" |
| #include "olap/storage_engine.h" |
| #include "olap/tablet.h" |
| #include "olap/tablet_meta.h" |
| #include "olap/tablet_meta_manager.h" |
| #include "olap/tablet_schema.h" |
| #include "olap/txn_manager.h" |
| #include "runtime/exec_env.h" |
| #include "service/backend_options.h" |
| #include "util/defer_op.h" |
| #include "util/doris_metrics.h" |
| #include "util/histogram.h" |
| #include "util/metrics.h" |
| #include "util/path_util.h" |
| #include "util/scoped_cleanup.h" |
| #include "util/stopwatch.hpp" |
| #include "util/time.h" |
| #include "util/trace.h" |
| #include "util/uid_util.h" |
| |
| namespace doris { |
| class CumulativeCompactionPolicy; |
| } // namespace doris |
| |
| using std::map; |
| using std::set; |
| using std::string; |
| using std::vector; |
| |
| namespace doris { |
| #include "common/compile_check_begin.h" |
| using namespace ErrorCode; |
| |
| bvar::Adder<int64_t> g_tablet_meta_schema_columns_count("tablet_meta_schema_columns_count"); |
| |
| TabletManager::TabletManager(StorageEngine& engine, int32_t tablet_map_lock_shard_size) |
| : _engine(engine), |
| _tablets_shards_size(tablet_map_lock_shard_size), |
| _tablets_shards_mask(tablet_map_lock_shard_size - 1) { |
| CHECK_GT(_tablets_shards_size, 0); |
| CHECK_EQ(_tablets_shards_size & _tablets_shards_mask, 0); |
| _tablets_shards.resize(_tablets_shards_size); |
| } |
| |
| TabletManager::~TabletManager() = default; |
| |
| Status TabletManager::_add_tablet_unlocked(TTabletId tablet_id, const TabletSharedPtr& tablet, |
| bool update_meta, bool force, RuntimeProfile* profile) { |
| if (profile->get_counter("AddTablet") == nullptr) { |
| ADD_TIMER(profile, "AddTablet"); |
| } |
| Status res = Status::OK(); |
| VLOG_NOTICE << "begin to add tablet to TabletManager. " |
| << "tablet_id=" << tablet_id << ", force=" << force; |
| |
| TabletSharedPtr existed_tablet = nullptr; |
| tablet_map_t& tablet_map = _get_tablet_map(tablet_id); |
| const auto& iter = tablet_map.find(tablet_id); |
| if (iter != tablet_map.end()) { |
| existed_tablet = iter->second; |
| } |
| |
| if (existed_tablet == nullptr) { |
| return _add_tablet_to_map_unlocked(tablet_id, tablet, update_meta, false /*keep_files*/, |
| false /*drop_old*/, profile); |
| } |
| // During restore process, the tablet is exist and snapshot loader will replace the tablet's rowsets |
| // and then reload the tablet, the tablet's path will the same |
| if (!force) { |
| if (existed_tablet->tablet_path() == tablet->tablet_path()) { |
| return Status::Error<ENGINE_INSERT_EXISTS_TABLE>( |
| "add the same tablet twice! tablet_id={}, tablet_path={}", tablet_id, |
| tablet->tablet_path()); |
| } |
| if (existed_tablet->data_dir() == tablet->data_dir()) { |
| return Status::Error<ENGINE_INSERT_EXISTS_TABLE>( |
| "add tablet with same data dir twice! tablet_id={}", tablet_id); |
| } |
| } |
| |
| MonotonicStopWatch watch; |
| watch.start(); |
| |
| // During storage migration, the tablet is moved to another disk, have to check |
| // if the new tablet's rowset version is larger than the old one to prevent losting data during |
| // migration |
| int64_t old_time, new_time; |
| int64_t old_version, new_version; |
| { |
| std::shared_lock rdlock(existed_tablet->get_header_lock()); |
| const RowsetSharedPtr old_rowset = existed_tablet->get_rowset_with_max_version(); |
| const RowsetSharedPtr new_rowset = tablet->get_rowset_with_max_version(); |
| // If new tablet is empty, it is a newly created schema change tablet. |
| // the old tablet is dropped before add tablet. it should not exist old tablet |
| if (new_rowset == nullptr) { |
| // it seems useless to call unlock and return here. |
| // it could prevent error when log level is changed in the future. |
| return Status::Error<ENGINE_INSERT_EXISTS_TABLE>( |
| "new tablet is empty and old tablet exists. it should not happen. tablet_id={}", |
| tablet_id); |
| } |
| old_time = old_rowset == nullptr ? -1 : old_rowset->creation_time(); |
| new_time = new_rowset->creation_time(); |
| old_version = old_rowset == nullptr ? -1 : old_rowset->end_version(); |
| new_version = new_rowset->end_version(); |
| } |
| COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "GetExistTabletVersion", "AddTablet"), |
| static_cast<int64_t>(watch.reset())); |
| |
| // In restore process, we replace all origin files in tablet dir with |
| // the downloaded snapshot files. Then we try to reload tablet header. |
| // force == true means we forcibly replace the Tablet in tablet_map |
| // with the new one. But if we do so, the files in the tablet dir will be |
| // dropped when the origin Tablet deconstruct. |
| // So we set keep_files == true to not delete files when the |
| // origin Tablet deconstruct. |
| // During restore process, snapshot loader |
| // replaced the old tablet's rowset with new rowsets, but the tablet path is reused, if drop files |
| // here, the new rowset's file will also be dropped, so use keep files here |
| bool keep_files = force; |
| if (force || |
| (new_version > old_version || (new_version == old_version && new_time >= old_time))) { |
| // check if new tablet's meta is in store and add new tablet's meta to meta store |
| res = _add_tablet_to_map_unlocked(tablet_id, tablet, update_meta, keep_files, |
| true /*drop_old*/, profile); |
| } else { |
| RETURN_IF_ERROR(tablet->set_tablet_state(TABLET_SHUTDOWN)); |
| tablet->save_meta(); |
| COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "SaveMeta", "AddTablet"), |
| static_cast<int64_t>(watch.reset())); |
| { |
| std::lock_guard<std::shared_mutex> shutdown_tablets_wrlock(_shutdown_tablets_lock); |
| _shutdown_tablets.push_back(tablet); |
| } |
| |
| res = Status::Error<ENGINE_INSERT_OLD_TABLET>( |
| "set tablet to shutdown state. tablet_id={}, tablet_path={}", tablet->tablet_id(), |
| tablet->tablet_path()); |
| } |
| LOG(WARNING) << "add duplicated tablet. force=" << force << ", res=" << res |
| << ", tablet_id=" << tablet_id << ", old_version=" << old_version |
| << ", new_version=" << new_version << ", old_time=" << old_time |
| << ", new_time=" << new_time |
| << ", old_tablet_path=" << existed_tablet->tablet_path() |
| << ", new_tablet_path=" << tablet->tablet_path(); |
| |
| return res; |
| } |
| |
| Status TabletManager::_add_tablet_to_map_unlocked(TTabletId tablet_id, |
| const TabletSharedPtr& tablet, bool update_meta, |
| bool keep_files, bool drop_old, |
| RuntimeProfile* profile) { |
| // check if new tablet's meta is in store and add new tablet's meta to meta store |
| Status res = Status::OK(); |
| MonotonicStopWatch watch; |
| watch.start(); |
| if (update_meta) { |
| // call tablet save meta in order to valid the meta |
| tablet->save_meta(); |
| COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "SaveMeta", "AddTablet"), |
| static_cast<int64_t>(watch.reset())); |
| } |
| if (drop_old) { |
| // If the new tablet is fresher than the existing one, then replace |
| // the existing tablet with the new one. |
| // Use default replica_id to ignore whether replica_id is match when drop tablet. |
| Status status = _drop_tablet(tablet_id, /* replica_id */ 0, keep_files, false, true); |
| COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "DropOldTablet", "AddTablet"), |
| static_cast<int64_t>(watch.reset())); |
| RETURN_NOT_OK_STATUS_WITH_WARN( |
| status, absl::Substitute("failed to drop old tablet when add new tablet. " |
| "tablet_id=$0", |
| tablet_id)); |
| } |
| // Register tablet into DataDir, so that we can manage tablet from |
| // the perspective of root path. |
| // Example: unregister all tables when a bad disk found. |
| tablet->register_tablet_into_dir(); |
| tablet_map_t& tablet_map = _get_tablet_map(tablet_id); |
| tablet_map[tablet_id] = tablet; |
| _add_tablet_to_partition(tablet); |
| g_tablet_meta_schema_columns_count << tablet->tablet_meta()->tablet_columns_num(); |
| COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "RegisterTabletInfo", "AddTablet"), |
| static_cast<int64_t>(watch.reset())); |
| |
| VLOG_NOTICE << "add tablet to map successfully." |
| << " tablet_id=" << tablet_id; |
| |
| return res; |
| } |
| |
| bool TabletManager::check_tablet_id_exist(TTabletId tablet_id) { |
| std::shared_lock rdlock(_get_tablets_shard_lock(tablet_id)); |
| return _check_tablet_id_exist_unlocked(tablet_id); |
| } |
| |
| bool TabletManager::_check_tablet_id_exist_unlocked(TTabletId tablet_id) { |
| tablet_map_t& tablet_map = _get_tablet_map(tablet_id); |
| return tablet_map.find(tablet_id) != tablet_map.end(); |
| } |
| |
| Status TabletManager::create_tablet(const TCreateTabletReq& request, std::vector<DataDir*> stores, |
| RuntimeProfile* profile) { |
| DorisMetrics::instance()->create_tablet_requests_total->increment(1); |
| |
| int64_t tablet_id = request.tablet_id; |
| LOG(INFO) << "begin to create tablet. tablet_id=" << tablet_id |
| << ", table_id=" << request.table_id << ", partition_id=" << request.partition_id |
| << ", replica_id=" << request.replica_id << ", stores.size=" << stores.size() |
| << ", first store=" << stores[0]->path(); |
| |
| // when we create rollup tablet A(assume on shard-1) from tablet B(assume on shard-2) |
| // we need use write lock on shard-1 and then use read lock on shard-2 |
| // if there have create rollup tablet C(assume on shard-2) from tablet D(assume on shard-1) at the same time, we will meet deadlock |
| std::unique_lock two_tablet_lock(_two_tablet_mtx, std::defer_lock); |
| bool in_restore_mode = request.__isset.in_restore_mode && request.in_restore_mode; |
| bool is_schema_change_or_atomic_restore = |
| request.__isset.base_tablet_id && request.base_tablet_id > 0; |
| bool need_two_lock = |
| is_schema_change_or_atomic_restore && |
| ((_tablets_shards_mask & request.base_tablet_id) != (_tablets_shards_mask & tablet_id)); |
| if (need_two_lock) { |
| SCOPED_TIMER(ADD_TIMER(profile, "GetTwoTableLock")); |
| two_tablet_lock.lock(); |
| } |
| |
| MonotonicStopWatch shard_lock_watch; |
| shard_lock_watch.start(); |
| std::lock_guard wrlock(_get_tablets_shard_lock(tablet_id)); |
| shard_lock_watch.stop(); |
| COUNTER_UPDATE(ADD_TIMER(profile, "GetShardLock"), |
| static_cast<int64_t>(shard_lock_watch.elapsed_time())); |
| // Make create_tablet operation to be idempotent: |
| // 1. Return true if tablet with same tablet_id and schema_hash exist; |
| // false if tablet with same tablet_id but different schema_hash exist. |
| // 2. When this is an alter task, if the tablet(both tablet_id and schema_hash are |
| // same) already exist, then just return true(an duplicate request). But if |
| // tablet_id exist but with different schema_hash, return an error(report task will |
| // eventually trigger its deletion). |
| { |
| SCOPED_TIMER(ADD_TIMER(profile, "GetTabletUnlocked")); |
| if (_get_tablet_unlocked(tablet_id) != nullptr) { |
| LOG(INFO) << "success to create tablet. tablet already exist. tablet_id=" << tablet_id; |
| return Status::OK(); |
| } |
| } |
| |
| TabletSharedPtr base_tablet = nullptr; |
| // If the CreateTabletReq has base_tablet_id then it is a alter-tablet request |
| if (is_schema_change_or_atomic_restore) { |
| // if base_tablet_id's lock diffrent with new_tablet_id, we need lock it. |
| if (need_two_lock) { |
| SCOPED_TIMER(ADD_TIMER(profile, "GetBaseTablet")); |
| base_tablet = get_tablet(request.base_tablet_id); |
| two_tablet_lock.unlock(); |
| } else { |
| SCOPED_TIMER(ADD_TIMER(profile, "GetBaseTabletUnlocked")); |
| base_tablet = _get_tablet_unlocked(request.base_tablet_id); |
| } |
| if (base_tablet == nullptr) { |
| DorisMetrics::instance()->create_tablet_requests_failed->increment(1); |
| return Status::Error<TABLE_CREATE_META_ERROR>( |
| "fail to create tablet(change schema/atomic restore), base tablet does not " |
| "exist. new_tablet_id={}, base_tablet_id={}", |
| tablet_id, request.base_tablet_id); |
| } |
| // If we are doing schema-change or atomic-restore, we should use the same data dir |
| // TODO(lingbin): A litter trick here, the directory should be determined before |
| // entering this method |
| // |
| // ATTN: Since all restored replicas will be saved to HDD, so no storage_medium check here. |
| if (in_restore_mode || |
| request.storage_medium == base_tablet->data_dir()->storage_medium()) { |
| LOG(INFO) << "create tablet use the base tablet data dir. tablet_id=" << tablet_id |
| << ", base tablet_id=" << request.base_tablet_id |
| << ", data dir=" << base_tablet->data_dir()->path(); |
| stores.clear(); |
| stores.push_back(base_tablet->data_dir()); |
| } |
| } |
| |
| // set alter type to schema-change. it is useless |
| TabletSharedPtr tablet = _internal_create_tablet_unlocked( |
| request, is_schema_change_or_atomic_restore, base_tablet.get(), stores, profile); |
| if (tablet == nullptr) { |
| DorisMetrics::instance()->create_tablet_requests_failed->increment(1); |
| return Status::Error<CE_CMD_PARAMS_ERROR>("fail to create tablet. tablet_id={}", |
| request.tablet_id); |
| } |
| |
| LOG(INFO) << "success to create tablet. tablet_id=" << tablet_id |
| << ", tablet_path=" << tablet->tablet_path(); |
| return Status::OK(); |
| } |
| |
| TabletSharedPtr TabletManager::_internal_create_tablet_unlocked( |
| const TCreateTabletReq& request, const bool is_schema_change, const Tablet* base_tablet, |
| const std::vector<DataDir*>& data_dirs, RuntimeProfile* profile) { |
| // If in schema-change state, base_tablet must also be provided. |
| // i.e., is_schema_change and base_tablet are either assigned or not assigned |
| DCHECK((is_schema_change && base_tablet) || (!is_schema_change && !base_tablet)); |
| |
| // NOTE: The existence of tablet_id and schema_hash has already been checked, |
| // no need check again here. |
| |
| const std::string parent_timer_name = "InternalCreateTablet"; |
| SCOPED_TIMER(ADD_TIMER(profile, parent_timer_name)); |
| |
| MonotonicStopWatch watch; |
| watch.start(); |
| auto create_meta_timer = ADD_CHILD_TIMER(profile, "CreateMeta", parent_timer_name); |
| auto tablet = _create_tablet_meta_and_dir_unlocked(request, is_schema_change, base_tablet, |
| data_dirs, profile); |
| COUNTER_UPDATE(create_meta_timer, static_cast<int64_t>(watch.reset())); |
| if (tablet == nullptr) { |
| return nullptr; |
| } |
| |
| int64_t new_tablet_id = request.tablet_id; |
| int32_t new_schema_hash = request.tablet_schema.schema_hash; |
| |
| // should remove the tablet's pending_id no matter create-tablet success or not |
| DataDir* data_dir = tablet->data_dir(); |
| |
| // TODO(yiguolei) |
| // the following code is very difficult to understand because it mixed alter tablet v2 |
| // and alter tablet v1 should remove alter tablet v1 code after v0.12 |
| Status res = Status::OK(); |
| bool is_tablet_added = false; |
| do { |
| res = tablet->init(); |
| COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "TabletInit", parent_timer_name), |
| static_cast<int64_t>(watch.reset())); |
| if (!res.ok()) { |
| LOG(WARNING) << "tablet init failed. tablet:" << tablet->tablet_id(); |
| break; |
| } |
| |
| // Create init version if this is not a restore mode replica and request.version is set |
| // bool in_restore_mode = request.__isset.in_restore_mode && request.in_restore_mode; |
| // if (!in_restore_mode && request.__isset.version) { |
| // create initial rowset before add it to storage engine could omit many locks |
| res = tablet->create_initial_rowset(request.version); |
| COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "InitRowset", parent_timer_name), |
| static_cast<int64_t>(watch.reset())); |
| if (!res.ok()) { |
| LOG(WARNING) << "fail to create initial version for tablet. res=" << res; |
| break; |
| } |
| |
| if (is_schema_change) { |
| // if this is a new alter tablet, has to set its state to not ready |
| // because schema change handler depends on it to check whether history data |
| // convert finished |
| static_cast<void>(tablet->set_tablet_state(TabletState::TABLET_NOTREADY)); |
| } |
| // Add tablet to StorageEngine will make it visible to user |
| // Will persist tablet meta |
| auto add_tablet_timer = ADD_CHILD_TIMER(profile, "AddTablet", parent_timer_name); |
| res = _add_tablet_unlocked(new_tablet_id, tablet, /*update_meta*/ true, false, profile); |
| COUNTER_UPDATE(add_tablet_timer, static_cast<int64_t>(watch.reset())); |
| if (!res.ok()) { |
| LOG(WARNING) << "fail to add tablet to StorageEngine. res=" << res; |
| break; |
| } |
| is_tablet_added = true; |
| |
| // TODO(lingbin): The following logic seems useless, can be removed? |
| // Because if _add_tablet_unlocked() return OK, we must can get it from map. |
| TabletSharedPtr tablet_ptr = _get_tablet_unlocked(new_tablet_id); |
| COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "GetTablet", parent_timer_name), |
| static_cast<int64_t>(watch.reset())); |
| if (tablet_ptr == nullptr) { |
| res = Status::Error<TABLE_NOT_FOUND>("fail to get tablet. res={}", res); |
| break; |
| } |
| } while (false); |
| |
| if (res.ok()) { |
| return tablet; |
| } |
| // something is wrong, we need clear environment |
| if (is_tablet_added) { |
| Status status = _drop_tablet(new_tablet_id, request.replica_id, false, false, true); |
| COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "DropTablet", parent_timer_name), |
| static_cast<int64_t>(watch.reset())); |
| if (!status.ok()) { |
| LOG(WARNING) << "fail to drop tablet when create tablet failed. res=" << res; |
| } |
| } else { |
| tablet->delete_all_files(); |
| static_cast<void>(TabletMetaManager::remove(data_dir, new_tablet_id, new_schema_hash)); |
| COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "RemoveTabletFiles", parent_timer_name), |
| static_cast<int64_t>(watch.reset())); |
| } |
| return nullptr; |
| } |
| |
| static string _gen_tablet_dir(const string& dir, int32_t shard_id, int64_t tablet_id) { |
| string path = dir; |
| path = path_util::join_path_segments(path, DATA_PREFIX); |
| path = path_util::join_path_segments(path, std::to_string(shard_id)); |
| path = path_util::join_path_segments(path, std::to_string(tablet_id)); |
| return path; |
| } |
| |
| TabletSharedPtr TabletManager::_create_tablet_meta_and_dir_unlocked( |
| const TCreateTabletReq& request, const bool is_schema_change, const Tablet* base_tablet, |
| const std::vector<DataDir*>& data_dirs, RuntimeProfile* profile) { |
| string pending_id = TABLET_ID_PREFIX + std::to_string(request.tablet_id); |
| // Many attempts are made here in the hope that even if a disk fails, it can still continue. |
| std::string parent_timer_name = "CreateMeta"; |
| MonotonicStopWatch watch; |
| watch.start(); |
| for (auto& data_dir : data_dirs) { |
| COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "RemovePendingIds", parent_timer_name), |
| static_cast<int64_t>(watch.reset())); |
| |
| TabletMetaSharedPtr tablet_meta; |
| // if create meta failed, do not need to clean dir, because it is only in memory |
| Status res = _create_tablet_meta_unlocked(request, data_dir, is_schema_change, base_tablet, |
| &tablet_meta); |
| COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "CreateMetaUnlock", parent_timer_name), |
| static_cast<int64_t>(watch.reset())); |
| if (!res.ok()) { |
| LOG(WARNING) << "fail to create tablet meta. res=" << res |
| << ", root=" << data_dir->path(); |
| continue; |
| } |
| |
| string tablet_dir = |
| _gen_tablet_dir(data_dir->path(), tablet_meta->shard_id(), request.tablet_id); |
| string schema_hash_dir = path_util::join_path_segments( |
| tablet_dir, std::to_string(request.tablet_schema.schema_hash)); |
| |
| // Because the tablet is removed asynchronously, so that the dir may still exist when BE |
| // receive create-tablet request again, For example retried schema-change request |
| bool exists = true; |
| res = io::global_local_filesystem()->exists(schema_hash_dir, &exists); |
| if (!res.ok()) { |
| continue; |
| } |
| if (exists) { |
| LOG(WARNING) << "skip this dir because tablet path exist, path=" << schema_hash_dir; |
| continue; |
| } else { |
| Status st = io::global_local_filesystem()->create_directory(schema_hash_dir); |
| if (!st.ok()) { |
| continue; |
| } |
| } |
| |
| if (tablet_meta->partition_id() <= 0) { |
| LOG(WARNING) << "invalid partition id " << tablet_meta->partition_id() << ", tablet " |
| << tablet_meta->tablet_id(); |
| } |
| TabletSharedPtr new_tablet = |
| std::make_shared<Tablet>(_engine, std::move(tablet_meta), data_dir); |
| COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "CreateTabletFromMeta", parent_timer_name), |
| static_cast<int64_t>(watch.reset())); |
| return new_tablet; |
| } |
| return nullptr; |
| } |
| |
| Status TabletManager::drop_tablet(TTabletId tablet_id, TReplicaId replica_id, |
| bool is_drop_table_or_partition) { |
| return _drop_tablet(tablet_id, replica_id, false, is_drop_table_or_partition, false); |
| } |
| |
| // Drop specified tablet. |
| Status TabletManager::_drop_tablet(TTabletId tablet_id, TReplicaId replica_id, bool keep_files, |
| bool is_drop_table_or_partition, bool had_held_shard_lock) { |
| LOG(INFO) << "begin drop tablet. tablet_id=" << tablet_id << ", replica_id=" << replica_id |
| << ", is_drop_table_or_partition=" << is_drop_table_or_partition |
| << ", keep_files=" << keep_files; |
| DorisMetrics::instance()->drop_tablet_requests_total->increment(1); |
| |
| RETURN_IF_ERROR(register_transition_tablet(tablet_id, "drop tablet")); |
| Defer defer {[&]() { unregister_transition_tablet(tablet_id, "drop tablet"); }}; |
| |
| // Fetch tablet which need to be dropped |
| TabletSharedPtr to_drop_tablet; |
| { |
| std::unique_lock<std::shared_mutex> wlock(_get_tablets_shard_lock(tablet_id), |
| std::defer_lock); |
| if (!had_held_shard_lock) { |
| wlock.lock(); |
| } |
| to_drop_tablet = _get_tablet_unlocked(tablet_id); |
| if (to_drop_tablet == nullptr) { |
| LOG(WARNING) << "fail to drop tablet because it does not exist. " |
| << "tablet_id=" << tablet_id; |
| return Status::OK(); |
| } |
| |
| // We should compare replica id to avoid dropping new cloned tablet. |
| // Iff request replica id is 0, FE may be an older release, then we drop this tablet as before. |
| if (to_drop_tablet->replica_id() != replica_id && replica_id != 0) { |
| return Status::Aborted("replica_id not match({} vs {})", to_drop_tablet->replica_id(), |
| replica_id); |
| } |
| |
| _remove_tablet_from_partition(to_drop_tablet); |
| tablet_map_t& tablet_map = _get_tablet_map(tablet_id); |
| tablet_map.erase(tablet_id); |
| } |
| |
| to_drop_tablet->clear_cache(); |
| |
| { |
| // drop tablet will update tablet meta, should lock |
| std::lock_guard<std::shared_mutex> wrlock(to_drop_tablet->get_header_lock()); |
| SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD); |
| // NOTE: has to update tablet here, but must not update tablet meta directly. |
| // because other thread may hold the tablet object, they may save meta too. |
| // If update meta directly here, other thread may override the meta |
| // and the tablet will be loaded at restart time. |
| // To avoid this exception, we first set the state of the tablet to `SHUTDOWN`. |
| // |
| // Until now, only the restore task uses keep files. |
| RETURN_IF_ERROR(to_drop_tablet->set_tablet_state(TABLET_SHUTDOWN)); |
| if (!keep_files) { |
| LOG(INFO) << "set tablet to shutdown state and remove it from memory. " |
| << "tablet_id=" << tablet_id |
| << ", tablet_path=" << to_drop_tablet->tablet_path(); |
| // We must record unused remote rowsets path info to OlapMeta before tablet state is marked as TABLET_SHUTDOWN in OlapMeta, |
| // otherwise if BE shutdown after saving tablet state, these remote rowsets path info will lost. |
| if (is_drop_table_or_partition) { |
| RETURN_IF_ERROR(to_drop_tablet->remove_all_remote_rowsets()); |
| } |
| to_drop_tablet->save_meta(); |
| { |
| std::lock_guard<std::shared_mutex> wrdlock(_shutdown_tablets_lock); |
| _shutdown_tablets.push_back(to_drop_tablet); |
| } |
| } |
| } |
| |
| to_drop_tablet->deregister_tablet_from_dir(); |
| g_tablet_meta_schema_columns_count << -to_drop_tablet->tablet_meta()->tablet_columns_num(); |
| return Status::OK(); |
| } |
| |
| TabletSharedPtr TabletManager::get_tablet(TTabletId tablet_id, bool include_deleted, string* err) { |
| std::shared_lock rdlock(_get_tablets_shard_lock(tablet_id)); |
| return _get_tablet_unlocked(tablet_id, include_deleted, err); |
| } |
| |
| std::vector<TabletSharedPtr> TabletManager::get_all_tablet(std::function<bool(Tablet*)>&& filter) { |
| std::vector<TabletSharedPtr> res; |
| for_each_tablet([&](const TabletSharedPtr& tablet) { res.emplace_back(tablet); }, |
| std::move(filter)); |
| return res; |
| } |
| |
| void TabletManager::for_each_tablet(std::function<void(const TabletSharedPtr&)>&& handler, |
| std::function<bool(Tablet*)>&& filter) { |
| std::vector<TabletSharedPtr> tablets; |
| for (const auto& tablets_shard : _tablets_shards) { |
| tablets.clear(); |
| { |
| std::shared_lock rdlock(tablets_shard.lock); |
| for (const auto& [id, tablet] : tablets_shard.tablet_map) { |
| if (filter(tablet.get())) { |
| tablets.emplace_back(tablet); |
| } |
| } |
| } |
| for (const auto& tablet : tablets) { |
| handler(tablet); |
| } |
| } |
| } |
| |
| TabletSharedPtr TabletManager::_get_tablet_unlocked(TTabletId tablet_id, bool include_deleted, |
| string* err) { |
| TabletSharedPtr tablet; |
| tablet = _get_tablet_unlocked(tablet_id); |
| if (tablet == nullptr && include_deleted) { |
| std::shared_lock rdlock(_shutdown_tablets_lock); |
| for (auto& deleted_tablet : _shutdown_tablets) { |
| CHECK(deleted_tablet != nullptr) << "deleted tablet is nullptr"; |
| if (deleted_tablet->tablet_id() == tablet_id) { |
| tablet = deleted_tablet; |
| break; |
| } |
| } |
| } |
| |
| if (tablet == nullptr) { |
| if (err != nullptr) { |
| *err = "tablet does not exist. " + BackendOptions::get_localhost(); |
| } |
| return nullptr; |
| } |
| #ifndef BE_TEST |
| if (!tablet->is_used()) { |
| LOG(WARNING) << "tablet cannot be used. tablet=" << tablet_id; |
| if (err != nullptr) { |
| *err = "tablet cannot be used. " + BackendOptions::get_localhost(); |
| } |
| return nullptr; |
| } |
| #endif |
| |
| return tablet; |
| } |
| |
| TabletSharedPtr TabletManager::get_tablet(TTabletId tablet_id, TabletUid tablet_uid, |
| bool include_deleted, string* err) { |
| std::shared_lock rdlock(_get_tablets_shard_lock(tablet_id)); |
| TabletSharedPtr tablet = _get_tablet_unlocked(tablet_id, include_deleted, err); |
| if (tablet != nullptr && tablet->tablet_uid() == tablet_uid) { |
| return tablet; |
| } |
| return nullptr; |
| } |
| |
| uint64_t TabletManager::get_rowset_nums() { |
| uint64_t rowset_nums = 0; |
| for_each_tablet([&](const TabletSharedPtr& tablet) { rowset_nums += tablet->version_count(); }, |
| filter_all_tablets); |
| return rowset_nums; |
| } |
| |
| uint64_t TabletManager::get_segment_nums() { |
| uint64_t segment_nums = 0; |
| for_each_tablet([&](const TabletSharedPtr& tablet) { segment_nums += tablet->segment_count(); }, |
| filter_all_tablets); |
| return segment_nums; |
| } |
| |
| bool TabletManager::get_tablet_id_and_schema_hash_from_path(const string& path, |
| TTabletId* tablet_id, |
| TSchemaHash* schema_hash) { |
| // the path like: /data/14/10080/964828783/ |
| static re2::RE2 normal_re("/data/\\d+/(\\d+)/(\\d+)($|/)"); |
| // match tablet schema hash data path, for example, the path is /data/1/16791/29998 |
| // 1 is shard id , 16791 is tablet id, 29998 is schema hash |
| if (RE2::PartialMatch(path, normal_re, tablet_id, schema_hash)) { |
| return true; |
| } |
| |
| // If we can't match normal path pattern, this may be a path which is a empty tablet |
| // directory. Use this pattern to match empty tablet directory. In this case schema_hash |
| // will be set to zero. |
| static re2::RE2 empty_tablet_re("/data/\\d+/(\\d+)($|/$)"); |
| if (!RE2::PartialMatch(path, empty_tablet_re, tablet_id)) { |
| return false; |
| } |
| *schema_hash = 0; |
| return true; |
| } |
| |
| bool TabletManager::get_rowset_id_from_path(const string& path, RowsetId* rowset_id) { |
| // the path like: /data/14/10080/964828783/02000000000000969144d8725cb62765f9af6cd3125d5a91_0.dat |
| static re2::RE2 re("/data/\\d+/\\d+/\\d+/([A-Fa-f0-9]+)_.*"); |
| string id_str; |
| bool ret = RE2::PartialMatch(path, re, &id_str); |
| if (ret) { |
| rowset_id->init(id_str); |
| return true; |
| } |
| return false; |
| } |
| |
| void TabletManager::get_tablet_stat(TTabletStatResult* result) { |
| std::shared_ptr<std::vector<TTabletStat>> local_cache; |
| { |
| std::lock_guard<std::mutex> guard(_tablet_stat_cache_mutex); |
| local_cache = _tablet_stat_list_cache; |
| } |
| result->__set_tablet_stat_list(*local_cache); |
| } |
| |
| struct TabletScore { |
| TabletSharedPtr tablet_ptr; |
| int score; |
| }; |
| |
| std::vector<TabletSharedPtr> TabletManager::find_best_tablets_to_compaction( |
| CompactionType compaction_type, DataDir* data_dir, |
| const std::unordered_set<TabletSharedPtr>& tablet_submitted_compaction, uint32_t* score, |
| const std::unordered_map<std::string_view, std::shared_ptr<CumulativeCompactionPolicy>>& |
| all_cumulative_compaction_policies) { |
| int64_t now_ms = UnixMillis(); |
| const string& compaction_type_str = |
| compaction_type == CompactionType::BASE_COMPACTION ? "base" : "cumulative"; |
| uint32_t highest_score = 0; |
| // find the single compaction tablet |
| uint32_t single_compact_highest_score = 0; |
| TabletSharedPtr best_tablet; |
| TabletSharedPtr best_single_compact_tablet; |
| int64_t compaction_num_per_round = |
| ExecEnv::GetInstance()->storage_engine().to_local().get_compaction_num_per_round(); |
| auto cmp = [](TabletScore left, TabletScore right) { return left.score > right.score; }; |
| std::priority_queue<TabletScore, std::vector<TabletScore>, decltype(cmp)> top_tablets(cmp); |
| |
| auto handler = [&](const TabletSharedPtr& tablet_ptr) { |
| if (tablet_ptr->tablet_meta()->tablet_schema()->disable_auto_compaction()) { |
| LOG_EVERY_N(INFO, 500) << "Tablet " << tablet_ptr->tablet_id() |
| << " will be ignored by automatic compaction tasks since it's " |
| << "set to disabled automatic compaction."; |
| return; |
| } |
| |
| if (config::enable_skip_tablet_compaction && |
| tablet_ptr->should_skip_compaction(compaction_type, UnixSeconds())) { |
| return; |
| } |
| if (!tablet_ptr->can_do_compaction(data_dir->path_hash(), compaction_type)) { |
| return; |
| } |
| |
| auto search = tablet_submitted_compaction.find(tablet_ptr); |
| if (search != tablet_submitted_compaction.end()) { |
| return; |
| } |
| |
| int64_t last_failure_ms = tablet_ptr->last_cumu_compaction_failure_time(); |
| if (compaction_type == CompactionType::BASE_COMPACTION) { |
| last_failure_ms = tablet_ptr->last_base_compaction_failure_time(); |
| } |
| if (now_ms - last_failure_ms <= config::tablet_sched_delay_time_ms) { |
| VLOG_DEBUG << "Too often to check compaction, skip it. " |
| << "compaction_type=" << compaction_type_str |
| << ", last_failure_time_ms=" << last_failure_ms |
| << ", tablet_id=" << tablet_ptr->tablet_id(); |
| return; |
| } |
| |
| if (compaction_type == CompactionType::BASE_COMPACTION) { |
| std::unique_lock<std::mutex> lock(tablet_ptr->get_base_compaction_lock(), |
| std::try_to_lock); |
| if (!lock.owns_lock()) { |
| LOG(INFO) << "can not get base lock: " << tablet_ptr->tablet_id(); |
| return; |
| } |
| } else { |
| std::unique_lock<std::mutex> lock(tablet_ptr->get_cumulative_compaction_lock(), |
| std::try_to_lock); |
| if (!lock.owns_lock()) { |
| LOG(INFO) << "can not get cumu lock: " << tablet_ptr->tablet_id(); |
| return; |
| } |
| } |
| auto cumulative_compaction_policy = all_cumulative_compaction_policies.at( |
| tablet_ptr->tablet_meta()->compaction_policy()); |
| uint32_t current_compaction_score = tablet_ptr->calc_compaction_score(); |
| if (current_compaction_score < 5) { |
| tablet_ptr->set_skip_compaction(true, compaction_type, UnixSeconds()); |
| } |
| |
| // tablet should do single compaction |
| if (current_compaction_score > single_compact_highest_score && |
| tablet_ptr->should_fetch_from_peer()) { |
| bool ret = tablet_ptr->suitable_for_compaction(compaction_type, |
| cumulative_compaction_policy); |
| if (ret) { |
| single_compact_highest_score = current_compaction_score; |
| best_single_compact_tablet = tablet_ptr; |
| } |
| } |
| |
| if (compaction_num_per_round > 1 && !tablet_ptr->should_fetch_from_peer()) { |
| TabletScore ts; |
| ts.score = current_compaction_score; |
| ts.tablet_ptr = tablet_ptr; |
| if ((top_tablets.size() >= compaction_num_per_round && |
| current_compaction_score > top_tablets.top().score) || |
| top_tablets.size() < compaction_num_per_round) { |
| bool ret = tablet_ptr->suitable_for_compaction(compaction_type, |
| cumulative_compaction_policy); |
| if (ret) { |
| top_tablets.push(ts); |
| if (top_tablets.size() > compaction_num_per_round) { |
| top_tablets.pop(); |
| } |
| highest_score = std::max(current_compaction_score, highest_score); |
| } |
| } |
| } else { |
| if (current_compaction_score > highest_score && !tablet_ptr->should_fetch_from_peer()) { |
| bool ret = tablet_ptr->suitable_for_compaction(compaction_type, |
| cumulative_compaction_policy); |
| if (ret) { |
| highest_score = current_compaction_score; |
| best_tablet = tablet_ptr; |
| } |
| } |
| } |
| }; |
| |
| for_each_tablet(handler, filter_all_tablets); |
| std::vector<TabletSharedPtr> picked_tablet; |
| if (best_tablet != nullptr) { |
| VLOG_CRITICAL << "Found the best tablet for compaction. " |
| << "compaction_type=" << compaction_type_str |
| << ", tablet_id=" << best_tablet->tablet_id() << ", path=" << data_dir->path() |
| << ", highest_score=" << highest_score |
| << ", fetch from peer: " << best_tablet->should_fetch_from_peer(); |
| picked_tablet.emplace_back(std::move(best_tablet)); |
| } |
| |
| std::vector<TabletSharedPtr> reverse_top_tablets; |
| while (!top_tablets.empty()) { |
| reverse_top_tablets.emplace_back(top_tablets.top().tablet_ptr); |
| top_tablets.pop(); |
| } |
| |
| for (auto it = reverse_top_tablets.rbegin(); it != reverse_top_tablets.rend(); ++it) { |
| picked_tablet.emplace_back(*it); |
| } |
| |
| // pick single compaction tablet needs the highest score |
| if (best_single_compact_tablet != nullptr && single_compact_highest_score >= highest_score) { |
| VLOG_CRITICAL << "Found the best tablet for single compaction. " |
| << "compaction_type=" << compaction_type_str |
| << ", tablet_id=" << best_single_compact_tablet->tablet_id() |
| << ", path=" << data_dir->path() |
| << ", highest_score=" << single_compact_highest_score << ", fetch from peer: " |
| << best_single_compact_tablet->should_fetch_from_peer(); |
| picked_tablet.emplace_back(std::move(best_single_compact_tablet)); |
| } |
| *score = highest_score > single_compact_highest_score ? highest_score |
| : single_compact_highest_score; |
| return picked_tablet; |
| } |
| |
| Status TabletManager::load_tablet_from_meta(DataDir* data_dir, TTabletId tablet_id, |
| TSchemaHash schema_hash, std::string_view meta_binary, |
| bool update_meta, bool force, bool restore, |
| bool check_path) { |
| TabletMetaSharedPtr tablet_meta(new TabletMeta()); |
| Status status = tablet_meta->deserialize(meta_binary); |
| if (!status.ok()) { |
| return Status::Error<HEADER_PB_PARSE_FAILED>( |
| "fail to load tablet because can not parse meta_binary string. tablet_id={}, " |
| "schema_hash={}, path={}, status={}", |
| tablet_id, schema_hash, data_dir->path(), status); |
| } |
| |
| // check if tablet meta is valid |
| if (tablet_meta->tablet_id() != tablet_id || tablet_meta->schema_hash() != schema_hash) { |
| return Status::Error<HEADER_PB_PARSE_FAILED>( |
| "fail to load tablet because meet invalid tablet meta. trying to load " |
| "tablet(tablet_id={}, schema_hash={}), but meet tablet={}, path={}", |
| tablet_id, schema_hash, tablet_meta->tablet_id(), data_dir->path()); |
| } |
| if (tablet_meta->tablet_uid().hi == 0 && tablet_meta->tablet_uid().lo == 0) { |
| return Status::Error<HEADER_PB_PARSE_FAILED>( |
| "fail to load tablet because its uid == 0. tablet={}, path={}", |
| tablet_meta->tablet_id(), data_dir->path()); |
| } |
| |
| if (restore) { |
| // we're restoring tablet from trash, tablet state should be changed from shutdown back to running |
| tablet_meta->set_tablet_state(TABLET_RUNNING); |
| } |
| |
| if (tablet_meta->partition_id() == 0) { |
| LOG(WARNING) << "tablet=" << tablet_id << " load from meta but partition id eq 0"; |
| } |
| |
| TabletSharedPtr tablet = std::make_shared<Tablet>(_engine, std::move(tablet_meta), data_dir); |
| |
| // NOTE: method load_tablet_from_meta could be called by two cases as below |
| // case 1: BE start; |
| // case 2: Clone Task/Restore |
| // For case 1 doesn't need path check because BE is just starting and not ready, |
| // just check tablet meta status to judge whether tablet is delete is enough. |
| // For case 2, If a tablet has just been copied to local BE, |
| // it may be cleared by gc-thread(see perform_tablet_gc) because the tablet meta may not be loaded to memory. |
| // So clone task should check path and then failed and retry in this case. |
| if (check_path) { |
| bool exists = true; |
| RETURN_IF_ERROR(io::global_local_filesystem()->exists(tablet->tablet_path(), &exists)); |
| if (!exists) { |
| return Status::Error<TABLE_ALREADY_DELETED_ERROR>( |
| "tablet path not exists, create tablet failed, path={}", tablet->tablet_path()); |
| } |
| } |
| |
| if (tablet->tablet_meta()->tablet_state() == TABLET_SHUTDOWN) { |
| { |
| std::lock_guard<std::shared_mutex> shutdown_tablets_wrlock(_shutdown_tablets_lock); |
| _shutdown_tablets.push_back(tablet); |
| } |
| return Status::Error<TABLE_ALREADY_DELETED_ERROR>( |
| "fail to load tablet because it is to be deleted. tablet_id={}, schema_hash={}, " |
| "path={}", |
| tablet_id, schema_hash, data_dir->path()); |
| } |
| // NOTE: We do not check tablet's initial version here, because if BE restarts when |
| // one tablet is doing schema-change, we may meet empty tablet. |
| if (tablet->max_version().first == -1 && tablet->tablet_state() == TABLET_RUNNING) { |
| // tablet state is invalid, drop tablet |
| return Status::Error<TABLE_INDEX_VALIDATE_ERROR>( |
| "fail to load tablet. it is in running state but without delta. tablet={}, path={}", |
| tablet->tablet_id(), data_dir->path()); |
| } |
| |
| RETURN_NOT_OK_STATUS_WITH_WARN( |
| tablet->init(), absl::Substitute("tablet init failed. tablet=$0", tablet->tablet_id())); |
| |
| RuntimeProfile profile("CreateTablet"); |
| std::lock_guard<std::shared_mutex> wrlock(_get_tablets_shard_lock(tablet_id)); |
| RETURN_NOT_OK_STATUS_WITH_WARN( |
| _add_tablet_unlocked(tablet_id, tablet, update_meta, force, &profile), |
| absl::Substitute("fail to add tablet. tablet=$0", tablet->tablet_id())); |
| |
| return Status::OK(); |
| } |
| |
| Status TabletManager::load_tablet_from_dir(DataDir* store, TTabletId tablet_id, |
| SchemaHash schema_hash, const string& schema_hash_path, |
| bool force, bool restore) { |
| LOG(INFO) << "begin to load tablet from dir. " |
| << " tablet_id=" << tablet_id << " schema_hash=" << schema_hash |
| << " path = " << schema_hash_path << " force = " << force << " restore = " << restore; |
| // not add lock here, because load_tablet_from_meta already add lock |
| std::string header_path = TabletMeta::construct_header_file_path(schema_hash_path, tablet_id); |
| // should change shard id before load tablet |
| std::string shard_path = |
| path_util::dir_name(path_util::dir_name(path_util::dir_name(header_path))); |
| std::string shard_str = shard_path.substr(shard_path.find_last_of('/') + 1); |
| int32_t shard = static_cast<int32_t>(stol(shard_str)); |
| |
| bool exists = false; |
| RETURN_IF_ERROR(io::global_local_filesystem()->exists(header_path, &exists)); |
| if (!exists) { |
| return Status::Error<NOT_FOUND>("fail to find header file. [header_path={}]", header_path); |
| } |
| |
| TabletMetaSharedPtr tablet_meta(new TabletMeta()); |
| if (!tablet_meta->create_from_file(header_path).ok()) { |
| return Status::Error<ENGINE_LOAD_INDEX_TABLE_ERROR>( |
| "fail to load tablet_meta. file_path={}", header_path); |
| } |
| TabletUid tablet_uid = TabletUid::gen_uid(); |
| |
| // remove rowset binlog metas |
| auto binlog_metas_file = fmt::format("{}/rowset_binlog_metas.pb", schema_hash_path); |
| bool binlog_metas_file_exists = false; |
| auto file_exists_status = |
| io::global_local_filesystem()->exists(binlog_metas_file, &binlog_metas_file_exists); |
| if (!file_exists_status.ok()) { |
| return file_exists_status; |
| } |
| bool contain_binlog = false; |
| RowsetBinlogMetasPB rowset_binlog_metas_pb; |
| if (binlog_metas_file_exists) { |
| auto binlog_meta_filesize = std::filesystem::file_size(binlog_metas_file); |
| if (binlog_meta_filesize > 0) { |
| contain_binlog = true; |
| RETURN_IF_ERROR(read_pb(binlog_metas_file, &rowset_binlog_metas_pb)); |
| VLOG_DEBUG << "load rowset binlog metas from file. file_path=" << binlog_metas_file; |
| } |
| RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(binlog_metas_file)); |
| } |
| if (contain_binlog) { |
| auto binlog_dir = fmt::format("{}/_binlog", schema_hash_path); |
| RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(binlog_dir)); |
| |
| std::vector<io::FileInfo> files; |
| RETURN_IF_ERROR( |
| io::global_local_filesystem()->list(schema_hash_path, true, &files, &exists)); |
| for (auto& file : files) { |
| auto& filename = file.file_name; |
| std::string new_suffix; |
| std::string old_suffix; |
| |
| if (filename.ends_with(".binlog")) { |
| old_suffix = ".binlog"; |
| new_suffix = ".dat"; |
| } else if (filename.ends_with(".binlog-index")) { |
| old_suffix = ".binlog-index"; |
| new_suffix = ".idx"; |
| } else { |
| continue; |
| } |
| |
| std::string new_filename = filename; |
| new_filename.replace(filename.size() - old_suffix.size(), old_suffix.size(), |
| new_suffix); |
| auto from = fmt::format("{}/{}", schema_hash_path, filename); |
| auto to = fmt::format("{}/_binlog/{}", schema_hash_path, new_filename); |
| RETURN_IF_ERROR(io::global_local_filesystem()->rename(from, to)); |
| } |
| |
| auto* meta = store->get_meta(); |
| // if ingest binlog metas error, it will be gc in gc_unused_binlog_metas |
| RETURN_IF_ERROR( |
| RowsetMetaManager::ingest_binlog_metas(meta, tablet_uid, &rowset_binlog_metas_pb)); |
| } |
| |
| // has to change shard id here, because meta file maybe copied from other source |
| // its shard is different from local shard |
| tablet_meta->set_shard_id(shard); |
| // load dir is called by clone, restore, storage migration |
| // should change tablet uid when tablet object changed |
| tablet_meta->set_tablet_uid(std::move(tablet_uid)); |
| std::string meta_binary; |
| tablet_meta->serialize(&meta_binary); |
| RETURN_NOT_OK_STATUS_WITH_WARN( |
| load_tablet_from_meta(store, tablet_id, schema_hash, meta_binary, true, force, restore, |
| true), |
| absl::Substitute("fail to load tablet. header_path=$0", header_path)); |
| |
| return Status::OK(); |
| } |
| |
| Status TabletManager::report_tablet_info(TTabletInfo* tablet_info) { |
| LOG(INFO) << "begin to process report tablet info." |
| << "tablet_id=" << tablet_info->tablet_id; |
| |
| Status res = Status::OK(); |
| |
| TabletSharedPtr tablet = get_tablet(tablet_info->tablet_id); |
| if (tablet == nullptr) { |
| return Status::Error<TABLE_NOT_FOUND>("can't find tablet={}", tablet_info->tablet_id); |
| } |
| |
| tablet->build_tablet_report_info(tablet_info); |
| VLOG_TRACE << "success to process report tablet info."; |
| return res; |
| } |
| |
| void TabletManager::build_all_report_tablets_info(std::map<TTabletId, TTablet>* tablets_info) { |
| DCHECK(tablets_info != nullptr); |
| VLOG_NOTICE << "begin to build all report tablets info"; |
| |
| // build the expired txn map first, outside the tablet map lock |
| std::map<TabletInfo, std::vector<int64_t>> expire_txn_map; |
| _engine.txn_manager()->build_expire_txn_map(&expire_txn_map); |
| LOG(INFO) << "find expired transactions for " << expire_txn_map.size() << " tablets"; |
| |
| HistogramStat tablet_version_num_hist; |
| auto local_cache = std::make_shared<std::vector<TTabletStat>>(); |
| auto handler = [&](const TabletSharedPtr& tablet) { |
| auto& t_tablet = (*tablets_info)[tablet->tablet_id()]; |
| TTabletInfo& tablet_info = t_tablet.tablet_infos.emplace_back(); |
| tablet->build_tablet_report_info(&tablet_info, true, true); |
| // find expired transaction corresponding to this tablet |
| TabletInfo tinfo(tablet->tablet_id(), tablet->tablet_uid()); |
| auto find = expire_txn_map.find(tinfo); |
| if (find != expire_txn_map.end()) { |
| tablet_info.__set_transaction_ids(find->second); |
| expire_txn_map.erase(find); |
| } |
| tablet_version_num_hist.add(tablet_info.total_version_count); |
| auto& t_tablet_stat = local_cache->emplace_back(); |
| t_tablet_stat.__set_tablet_id(tablet_info.tablet_id); |
| t_tablet_stat.__set_data_size(tablet_info.data_size); |
| t_tablet_stat.__set_remote_data_size(tablet_info.remote_data_size); |
| t_tablet_stat.__set_row_count(tablet_info.row_count); |
| t_tablet_stat.__set_total_version_count(tablet_info.total_version_count); |
| t_tablet_stat.__set_visible_version_count(tablet_info.visible_version_count); |
| t_tablet_stat.__set_visible_version(tablet_info.version); |
| t_tablet_stat.__set_local_index_size(tablet_info.local_index_size); |
| t_tablet_stat.__set_local_segment_size(tablet_info.local_segment_size); |
| t_tablet_stat.__set_remote_index_size(tablet_info.remote_index_size); |
| t_tablet_stat.__set_remote_segment_size(tablet_info.remote_segment_size); |
| }; |
| for_each_tablet(handler, filter_all_tablets); |
| |
| { |
| std::lock_guard<std::mutex> guard(_tablet_stat_cache_mutex); |
| _tablet_stat_list_cache.swap(local_cache); |
| } |
| DorisMetrics::instance()->tablet_version_num_distribution->set_histogram( |
| tablet_version_num_hist); |
| LOG(INFO) << "success to build all report tablets info. tablet_count=" << tablets_info->size(); |
| } |
| |
| Status TabletManager::start_trash_sweep() { |
| DBUG_EXECUTE_IF("TabletManager.start_trash_sweep.sleep", DBUG_BLOCK); |
| std::unique_lock<std::mutex> lock(_gc_tablets_lock, std::defer_lock); |
| if (!lock.try_lock()) { |
| return Status::OK(); |
| } |
| |
| for_each_tablet([](const TabletSharedPtr& tablet) { tablet->delete_expired_stale_rowset(); }, |
| filter_all_tablets); |
| |
| if (config::enable_check_agg_and_remove_pre_rowsets_delete_bitmap) { |
| int64_t max_useless_rowset_count = 0; |
| int64_t tablet_id_with_max_useless_rowset_count = 0; |
| int64_t max_useless_rowset_version_count = 0; |
| int64_t tablet_id_with_max_useless_rowset_version_count = 0; |
| OlapStopWatch watch; |
| for_each_tablet( |
| [&](const TabletSharedPtr& tablet) { |
| int64_t useless_rowset_count = 0; |
| int64_t useless_rowset_version_count = 0; |
| tablet->check_agg_delete_bitmap_for_stale_rowsets(useless_rowset_count, |
| useless_rowset_version_count); |
| if (useless_rowset_count > max_useless_rowset_count) { |
| max_useless_rowset_count = useless_rowset_count; |
| tablet_id_with_max_useless_rowset_count = tablet->tablet_id(); |
| } |
| if (useless_rowset_version_count > max_useless_rowset_version_count) { |
| max_useless_rowset_version_count = useless_rowset_version_count; |
| tablet_id_with_max_useless_rowset_version_count = tablet->tablet_id(); |
| } |
| }, |
| filter_all_tablets); |
| g_max_rowsets_with_useless_delete_bitmap.set_value(max_useless_rowset_count); |
| g_max_rowsets_with_useless_delete_bitmap_version.set_value( |
| max_useless_rowset_version_count); |
| LOG(INFO) << "finish check_agg_delete_bitmap_for_stale_rowsets, cost(us)=" |
| << watch.get_elapse_time_us() |
| << ". max useless rowset count=" << max_useless_rowset_count |
| << ", tablet_id=" << tablet_id_with_max_useless_rowset_count |
| << ", max useless rowset version count=" << max_useless_rowset_version_count |
| << ", tablet_id=" << tablet_id_with_max_useless_rowset_version_count; |
| } |
| |
| std::list<TabletSharedPtr>::iterator last_it; |
| { |
| std::shared_lock rdlock(_shutdown_tablets_lock); |
| last_it = _shutdown_tablets.begin(); |
| if (last_it == _shutdown_tablets.end()) { |
| return Status::OK(); |
| } |
| } |
| |
| auto get_batch_tablets = [this, &last_it](int limit) { |
| std::vector<TabletSharedPtr> batch_tablets; |
| std::lock_guard<std::shared_mutex> wrdlock(_shutdown_tablets_lock); |
| while (last_it != _shutdown_tablets.end() && batch_tablets.size() < limit) { |
| // it means current tablet is referenced by other thread |
| if (last_it->use_count() > 1) { |
| last_it++; |
| } else { |
| batch_tablets.push_back(*last_it); |
| last_it = _shutdown_tablets.erase(last_it); |
| } |
| } |
| |
| return batch_tablets; |
| }; |
| |
| std::list<TabletSharedPtr> failed_tablets; |
| // return true if need continue delete |
| auto delete_one_batch = [this, get_batch_tablets, &failed_tablets]() -> bool { |
| int limit = 200; |
| for (;;) { |
| auto batch_tablets = get_batch_tablets(limit); |
| for (const auto& tablet : batch_tablets) { |
| if (_move_tablet_to_trash(tablet)) { |
| limit--; |
| } else { |
| failed_tablets.push_back(tablet); |
| } |
| } |
| if (limit <= 0) { |
| return true; |
| } |
| if (batch_tablets.empty()) { |
| return false; |
| } |
| } |
| |
| return false; |
| }; |
| |
| while (delete_one_batch()) { |
| #ifndef BE_TEST |
| sleep(1); |
| #endif |
| } |
| |
| if (!failed_tablets.empty()) { |
| std::lock_guard<std::shared_mutex> wrlock(_shutdown_tablets_lock); |
| _shutdown_tablets.splice(_shutdown_tablets.end(), failed_tablets); |
| } |
| |
| return Status::OK(); |
| } |
| |
| bool TabletManager::_move_tablet_to_trash(const TabletSharedPtr& tablet) { |
| RETURN_IF_ERROR(register_transition_tablet(tablet->tablet_id(), "move to trash")); |
| Defer defer {[&]() { unregister_transition_tablet(tablet->tablet_id(), "move to trash"); }}; |
| |
| TabletSharedPtr tablet_in_not_shutdown = get_tablet(tablet->tablet_id()); |
| if (tablet_in_not_shutdown) { |
| TSchemaHash schema_hash_not_shutdown = tablet_in_not_shutdown->schema_hash(); |
| size_t path_hash_not_shutdown = tablet_in_not_shutdown->data_dir()->path_hash(); |
| if (tablet->schema_hash() == schema_hash_not_shutdown && |
| tablet->data_dir()->path_hash() == path_hash_not_shutdown) { |
| tablet->clear_cache(); |
| // shard_id in memory not eq shard_id in shutdown |
| if (tablet_in_not_shutdown->tablet_path() != tablet->tablet_path()) { |
| LOG(INFO) << "tablet path not eq shutdown tablet path, move it to trash, tablet_id=" |
| << tablet_in_not_shutdown->tablet_id() |
| << ", mem manager tablet path=" << tablet_in_not_shutdown->tablet_path() |
| << ", shutdown tablet path=" << tablet->tablet_path(); |
| return tablet->data_dir()->move_to_trash(tablet->tablet_path()); |
| } else { |
| LOG(INFO) << "tablet path eq shutdown tablet path, not move to trash, tablet_id=" |
| << tablet_in_not_shutdown->tablet_id() |
| << ", mem manager tablet path=" << tablet_in_not_shutdown->tablet_path() |
| << ", shutdown tablet path=" << tablet->tablet_path(); |
| return true; |
| } |
| } |
| } |
| |
| TabletMetaSharedPtr tablet_meta(new TabletMeta()); |
| int64_t get_meta_ts = MonotonicMicros(); |
| Status check_st = TabletMetaManager::get_meta(tablet->data_dir(), tablet->tablet_id(), |
| tablet->schema_hash(), tablet_meta); |
| if (check_st.ok()) { |
| if (tablet_meta->tablet_state() != TABLET_SHUTDOWN || |
| tablet_meta->tablet_uid() != tablet->tablet_uid()) { |
| LOG(WARNING) << "tablet's state changed to normal, skip remove dirs" |
| << " tablet id = " << tablet_meta->tablet_id() |
| << " schema hash = " << tablet_meta->schema_hash() |
| << " old tablet_uid=" << tablet->tablet_uid() |
| << " cur tablet_uid=" << tablet_meta->tablet_uid(); |
| return true; |
| } |
| |
| tablet->clear_cache(); |
| |
| // move data to trash |
| const auto& tablet_path = tablet->tablet_path(); |
| bool exists = false; |
| Status exists_st = io::global_local_filesystem()->exists(tablet_path, &exists); |
| if (!exists_st) { |
| return false; |
| } |
| if (exists) { |
| // take snapshot of tablet meta |
| auto meta_file_path = fmt::format("{}/{}.hdr", tablet_path, tablet->tablet_id()); |
| int64_t save_meta_ts = MonotonicMicros(); |
| auto save_st = tablet->tablet_meta()->save(meta_file_path); |
| if (!save_st.ok()) { |
| LOG(WARNING) << "failed to save meta, tablet_id=" << tablet_meta->tablet_id() |
| << ", tablet_uid=" << tablet_meta->tablet_uid() |
| << ", error=" << save_st; |
| return false; |
| } |
| int64_t now = MonotonicMicros(); |
| LOG(INFO) << "start to move tablet to trash. " << tablet_path |
| << ". rocksdb get meta cost " << (save_meta_ts - get_meta_ts) |
| << " us, rocksdb save meta cost " << (now - save_meta_ts) << " us"; |
| Status rm_st = tablet->data_dir()->move_to_trash(tablet_path); |
| if (!rm_st.ok()) { |
| LOG(WARNING) << "fail to move dir to trash. " << tablet_path; |
| return false; |
| } |
| } |
| // remove tablet meta |
| auto remove_st = TabletMetaManager::remove(tablet->data_dir(), tablet->tablet_id(), |
| tablet->schema_hash()); |
| if (!remove_st.ok()) { |
| LOG(WARNING) << "failed to remove meta, tablet_id=" << tablet_meta->tablet_id() |
| << ", tablet_uid=" << tablet_meta->tablet_uid() << ", error=" << remove_st; |
| return false; |
| } |
| LOG(INFO) << "successfully move tablet to trash. " |
| << "tablet_id=" << tablet->tablet_id() |
| << ", schema_hash=" << tablet->schema_hash() << ", tablet_path=" << tablet_path; |
| return true; |
| } else { |
| tablet->clear_cache(); |
| // if could not find tablet info in meta store, then check if dir existed |
| const auto& tablet_path = tablet->tablet_path(); |
| bool exists = false; |
| Status exists_st = io::global_local_filesystem()->exists(tablet_path, &exists); |
| if (!exists_st) { |
| return false; |
| } |
| if (exists) { |
| if (check_st.is<META_KEY_NOT_FOUND>()) { |
| LOG(INFO) << "could not find tablet meta in rocksdb, so just delete it path " |
| << "tablet_id=" << tablet->tablet_id() |
| << ", schema_hash=" << tablet->schema_hash() |
| << ", delete tablet_path=" << tablet_path; |
| RETURN_IF_ERROR(io::global_local_filesystem()->delete_directory(tablet_path)); |
| RETURN_IF_ERROR(DataDir::delete_tablet_parent_path_if_empty(tablet_path)); |
| return true; |
| } |
| LOG(WARNING) << "errors while load meta from store, skip this tablet. " |
| << "tablet_id=" << tablet->tablet_id() |
| << ", schema_hash=" << tablet->schema_hash(); |
| return false; |
| } else { |
| LOG(INFO) << "could not find tablet dir, skip it and remove it from gc-queue. " |
| << "tablet_id=" << tablet->tablet_id() |
| << ", schema_hash=" << tablet->schema_hash() |
| << ", tablet_path=" << tablet_path; |
| return true; |
| } |
| } |
| } |
| |
| Status TabletManager::register_transition_tablet(int64_t tablet_id, std::string reason) { |
| tablets_shard& shard = _get_tablets_shard(tablet_id); |
| std::thread::id thread_id = std::this_thread::get_id(); |
| std::lock_guard<std::mutex> lk(shard.lock_for_transition); |
| if (auto search = shard.tablets_under_transition.find(tablet_id); |
| search == shard.tablets_under_transition.end()) { |
| // not found |
| shard.tablets_under_transition[tablet_id] = std::make_tuple(reason, thread_id, 1); |
| LOG(INFO) << "add tablet_id= " << tablet_id << " to map, reason=" << reason |
| << ", lock times=1, thread_id_in_map=" << thread_id; |
| return Status::OK(); |
| } else { |
| // found |
| auto& [r, thread_id_in_map, lock_times] = search->second; |
| if (thread_id != thread_id_in_map) { |
| // other thread, failed |
| LOG(INFO) << "tablet_id = " << tablet_id << " is doing " << r |
| << ", thread_id_in_map=" << thread_id_in_map << " , add reason=" << reason |
| << ", thread_id=" << thread_id; |
| return Status::InternalError<false>("{} failed try later, tablet_id={}", reason, |
| tablet_id); |
| } |
| // add lock times |
| ++lock_times; |
| LOG(INFO) << "add tablet_id= " << tablet_id << " to map, reason=" << reason |
| << ", lock times=" << lock_times << ", thread_id_in_map=" << thread_id_in_map; |
| return Status::OK(); |
| } |
| } |
| |
| void TabletManager::unregister_transition_tablet(int64_t tablet_id, std::string reason) { |
| tablets_shard& shard = _get_tablets_shard(tablet_id); |
| std::thread::id thread_id = std::this_thread::get_id(); |
| std::lock_guard<std::mutex> lk(shard.lock_for_transition); |
| if (auto search = shard.tablets_under_transition.find(tablet_id); |
| search == shard.tablets_under_transition.end()) { |
| // impossible, bug |
| DCHECK(false) << "tablet " << tablet_id |
| << " must be found, before unreg must have been reg"; |
| } else { |
| auto& [r, thread_id_in_map, lock_times] = search->second; |
| if (thread_id_in_map != thread_id) { |
| // impossible, bug |
| DCHECK(false) << "tablet " << tablet_id << " unreg thread must same reg thread"; |
| } |
| // sub lock times |
| --lock_times; |
| if (lock_times != 0) { |
| LOG(INFO) << "erase tablet_id= " << tablet_id << " from map, reason=" << reason |
| << ", left=" << lock_times << ", thread_id_in_map=" << thread_id_in_map; |
| } else { |
| LOG(INFO) << "erase tablet_id= " << tablet_id << " from map, reason=" << reason |
| << ", thread_id_in_map=" << thread_id_in_map; |
| shard.tablets_under_transition.erase(tablet_id); |
| } |
| } |
| } |
| |
| void TabletManager::try_delete_unused_tablet_path(DataDir* data_dir, TTabletId tablet_id, |
| SchemaHash schema_hash, |
| const string& schema_hash_path, |
| int16_t shard_id) { |
| // acquire the read lock, so that there is no creating tablet or load tablet from meta tasks |
| // create tablet and load tablet task should check whether the dir exists |
| tablets_shard& shard = _get_tablets_shard(tablet_id); |
| std::shared_lock rdlock(shard.lock); |
| |
| // check if meta already exists |
| TabletMetaSharedPtr tablet_meta(new TabletMeta()); |
| Status check_st = TabletMetaManager::get_meta(data_dir, tablet_id, schema_hash, tablet_meta); |
| if (check_st.ok() && tablet_meta->shard_id() == shard_id) { |
| return; |
| } |
| |
| LOG(INFO) << "tablet meta not exists, try delete tablet path " << schema_hash_path; |
| |
| bool succ = register_transition_tablet(tablet_id, "path gc"); |
| if (!succ) { |
| return; |
| } |
| Defer defer {[&]() { unregister_transition_tablet(tablet_id, "path gc"); }}; |
| |
| TabletSharedPtr tablet = _get_tablet_unlocked(tablet_id); |
| if (tablet != nullptr && tablet->tablet_path() == schema_hash_path) { |
| LOG(INFO) << "tablet exists, skip delete the path " << schema_hash_path; |
| return; |
| } |
| |
| // TODO(ygl): may do other checks in the future |
| bool exists = false; |
| Status exists_st = io::global_local_filesystem()->exists(schema_hash_path, &exists); |
| if (exists_st && exists) { |
| LOG(INFO) << "start to move tablet to trash. tablet_path = " << schema_hash_path; |
| Status rm_st = data_dir->move_to_trash(schema_hash_path); |
| if (!rm_st.ok()) { |
| LOG(WARNING) << "fail to move dir to trash. dir=" << schema_hash_path; |
| } else { |
| LOG(INFO) << "move path " << schema_hash_path << " to trash successfully"; |
| } |
| } |
| } |
| |
| void TabletManager::update_root_path_info(std::map<string, DataDirInfo>* path_map, |
| size_t* tablet_count) { |
| DCHECK(tablet_count); |
| *tablet_count = 0; |
| auto filter = [path_map, tablet_count](Tablet* t) -> bool { |
| ++(*tablet_count); |
| auto iter = path_map->find(t->data_dir()->path()); |
| return iter != path_map->end() && iter->second.is_used; |
| }; |
| |
| auto handler = [&](const TabletSharedPtr& tablet) { |
| auto& data_dir_info = (*path_map)[tablet->data_dir()->path()]; |
| data_dir_info.local_used_capacity += tablet->tablet_local_size(); |
| data_dir_info.remote_used_capacity += tablet->tablet_remote_size(); |
| }; |
| |
| for_each_tablet(handler, filter); |
| } |
| |
| void TabletManager::get_partition_related_tablets(int64_t partition_id, |
| std::set<TabletInfo>* tablet_infos) { |
| std::shared_lock rdlock(_partitions_lock); |
| auto it = _partitions.find(partition_id); |
| if (it != _partitions.end()) { |
| *tablet_infos = it->second.tablets; |
| } |
| } |
| |
| void TabletManager::get_partitions_visible_version(std::map<int64_t, int64_t>* partitions_version) { |
| std::shared_lock rdlock(_partitions_lock); |
| for (const auto& [partition_id, partition] : _partitions) { |
| partitions_version->insert( |
| {partition_id, partition.visible_version->version.load(std::memory_order_relaxed)}); |
| } |
| } |
| |
| void TabletManager::update_partitions_visible_version( |
| const std::map<int64_t, int64_t>& partitions_version) { |
| std::shared_lock rdlock(_partitions_lock); |
| for (auto [partition_id, version] : partitions_version) { |
| auto it = _partitions.find(partition_id); |
| if (it != _partitions.end()) { |
| it->second.visible_version->update_version_monoto(version); |
| } |
| } |
| } |
| |
| void TabletManager::do_tablet_meta_checkpoint(DataDir* data_dir) { |
| auto filter = [data_dir](Tablet* tablet) -> bool { |
| return tablet->tablet_state() == TABLET_RUNNING && |
| tablet->data_dir()->path_hash() == data_dir->path_hash() && tablet->is_used() && |
| tablet->init_succeeded(); |
| }; |
| |
| std::vector<TabletSharedPtr> related_tablets = get_all_tablet(filter); |
| int counter = 0; |
| MonotonicStopWatch watch; |
| watch.start(); |
| for (TabletSharedPtr tablet : related_tablets) { |
| if (tablet->do_tablet_meta_checkpoint()) { |
| ++counter; |
| } |
| } |
| int64_t cost = watch.elapsed_time() / 1000 / 1000; |
| LOG(INFO) << "finish to do meta checkpoint on dir: " << data_dir->path() |
| << ", number: " << counter << ", cost(ms): " << cost; |
| } |
| |
| Status TabletManager::_create_tablet_meta_unlocked(const TCreateTabletReq& request, DataDir* store, |
| const bool is_schema_change, |
| const Tablet* base_tablet, |
| TabletMetaSharedPtr* tablet_meta) { |
| uint32_t next_unique_id = 0; |
| std::unordered_map<uint32_t, uint32_t> col_idx_to_unique_id; |
| if (!is_schema_change) { |
| for (uint32_t col_idx = 0; col_idx < request.tablet_schema.columns.size(); ++col_idx) { |
| col_idx_to_unique_id[col_idx] = col_idx; |
| } |
| next_unique_id = cast_set<int32_t>(request.tablet_schema.columns.size()); |
| } else { |
| next_unique_id = cast_set<int32_t>(base_tablet->next_unique_id()); |
| auto& new_columns = request.tablet_schema.columns; |
| for (uint32_t new_col_idx = 0; new_col_idx < new_columns.size(); ++new_col_idx) { |
| const TColumn& column = new_columns[new_col_idx]; |
| // For schema change, compare old_tablet and new_tablet: |
| // 1. if column exist in both new_tablet and old_tablet, choose the column's |
| // unique_id in old_tablet to be the column's ordinal number in new_tablet |
| // 2. if column exists only in new_tablet, assign next_unique_id of old_tablet |
| // to the new column |
| int32_t old_col_idx = base_tablet->tablet_schema()->field_index(column.column_name); |
| if (old_col_idx != -1) { |
| uint32_t old_unique_id = |
| base_tablet->tablet_schema()->column(old_col_idx).unique_id(); |
| col_idx_to_unique_id[new_col_idx] = old_unique_id; |
| } else { |
| // Not exist in old tablet, it is a new added column |
| col_idx_to_unique_id[new_col_idx] = next_unique_id++; |
| } |
| } |
| } |
| VLOG_NOTICE << "creating tablet meta. next_unique_id=" << next_unique_id; |
| |
| // We generate a new tablet_uid for this new tablet. |
| uint64_t shard_id = store->get_shard(); |
| *tablet_meta = TabletMeta::create(request, TabletUid::gen_uid(), shard_id, next_unique_id, |
| col_idx_to_unique_id); |
| if (request.__isset.storage_format) { |
| if (request.storage_format == TStorageFormat::DEFAULT) { |
| (*tablet_meta)->set_preferred_rowset_type(_engine.default_rowset_type()); |
| } else if (request.storage_format == TStorageFormat::V1) { |
| (*tablet_meta)->set_preferred_rowset_type(ALPHA_ROWSET); |
| } else if (request.storage_format == TStorageFormat::V2) { |
| (*tablet_meta)->set_preferred_rowset_type(BETA_ROWSET); |
| } else { |
| return Status::Error<CE_CMD_PARAMS_ERROR>("invalid TStorageFormat: {}", |
| request.storage_format); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| TabletSharedPtr TabletManager::_get_tablet_unlocked(TTabletId tablet_id) { |
| VLOG_NOTICE << "begin to get tablet. tablet_id=" << tablet_id; |
| tablet_map_t& tablet_map = _get_tablet_map(tablet_id); |
| const auto& iter = tablet_map.find(tablet_id); |
| if (iter != tablet_map.end()) { |
| return iter->second; |
| } |
| return nullptr; |
| } |
| |
| void TabletManager::_add_tablet_to_partition(const TabletSharedPtr& tablet) { |
| std::lock_guard<std::shared_mutex> wrlock(_partitions_lock); |
| auto& partition = _partitions[tablet->partition_id()]; |
| partition.tablets.insert(tablet->get_tablet_info()); |
| tablet->set_visible_version( |
| std::static_pointer_cast<const VersionWithTime>(partition.visible_version)); |
| } |
| |
| void TabletManager::_remove_tablet_from_partition(const TabletSharedPtr& tablet) { |
| tablet->set_visible_version(nullptr); |
| std::lock_guard<std::shared_mutex> wrlock(_partitions_lock); |
| auto it = _partitions.find(tablet->partition_id()); |
| if (it == _partitions.end()) { |
| return; |
| } |
| |
| auto& tablets = it->second.tablets; |
| tablets.erase(tablet->get_tablet_info()); |
| if (tablets.empty()) { |
| _partitions.erase(it); |
| } |
| } |
| |
| void TabletManager::obtain_specific_quantity_tablets(vector<TabletInfo>& tablets_info, |
| int64_t num) { |
| for (const auto& tablets_shard : _tablets_shards) { |
| std::shared_lock rdlock(tablets_shard.lock); |
| for (const auto& item : tablets_shard.tablet_map) { |
| TabletSharedPtr tablet = item.second; |
| if (tablets_info.size() >= num) { |
| return; |
| } |
| if (tablet == nullptr) { |
| continue; |
| } |
| tablets_info.push_back(tablet->get_tablet_info()); |
| } |
| } |
| } |
| |
| std::shared_mutex& TabletManager::_get_tablets_shard_lock(TTabletId tabletId) { |
| return _get_tablets_shard(tabletId).lock; |
| } |
| |
| TabletManager::tablet_map_t& TabletManager::_get_tablet_map(TTabletId tabletId) { |
| return _get_tablets_shard(tabletId).tablet_map; |
| } |
| |
| TabletManager::tablets_shard& TabletManager::_get_tablets_shard(TTabletId tabletId) { |
| return _tablets_shards[tabletId & _tablets_shards_mask]; |
| } |
| |
| void TabletManager::get_tablets_distribution_on_different_disks( |
| std::map<int64_t, std::map<DataDir*, int64_t>>& tablets_num_on_disk, |
| std::map<int64_t, std::map<DataDir*, std::vector<TabletSize>>>& tablets_info_on_disk) { |
| std::vector<DataDir*> data_dirs = _engine.get_stores(); |
| std::map<int64_t, Partition> partitions; |
| { |
| // When drop tablet, '_partitions_lock' is locked in 'tablet_shard_lock'. |
| // To avoid locking 'tablet_shard_lock' in '_partitions_lock', we lock and |
| // copy _partitions here. |
| std::shared_lock rdlock(_partitions_lock); |
| partitions = _partitions; |
| } |
| |
| for (const auto& [partition_id, partition] : partitions) { |
| std::map<DataDir*, int64_t> tablets_num; |
| std::map<DataDir*, std::vector<TabletSize>> tablets_info; |
| for (auto* data_dir : data_dirs) { |
| tablets_num[data_dir] = 0; |
| } |
| |
| for (const auto& tablet_info : partition.tablets) { |
| // get_tablet() will hold 'tablet_shard_lock' |
| TabletSharedPtr tablet = get_tablet(tablet_info.tablet_id); |
| if (tablet == nullptr) { |
| continue; |
| } |
| DataDir* data_dir = tablet->data_dir(); |
| size_t tablet_footprint = tablet->tablet_footprint(); |
| tablets_num[data_dir]++; |
| TabletSize tablet_size(tablet_info.tablet_id, tablet_footprint); |
| tablets_info[data_dir].push_back(tablet_size); |
| } |
| tablets_num_on_disk[partition_id] = tablets_num; |
| tablets_info_on_disk[partition_id] = tablets_info; |
| } |
| } |
| |
| struct SortCtx { |
| SortCtx(TabletSharedPtr tablet, RowsetSharedPtr rowset, int64_t cooldown_timestamp, |
| int64_t file_size) |
| : tablet(tablet), cooldown_timestamp(cooldown_timestamp), file_size(file_size) {} |
| TabletSharedPtr tablet; |
| RowsetSharedPtr rowset; |
| // to ensure the tablet with -1 would always be greater than other |
| uint64_t cooldown_timestamp; |
| int64_t file_size; |
| bool operator<(const SortCtx& other) const { |
| if (this->cooldown_timestamp == other.cooldown_timestamp) { |
| return this->file_size > other.file_size; |
| } |
| return this->cooldown_timestamp < other.cooldown_timestamp; |
| } |
| }; |
| |
| void TabletManager::get_cooldown_tablets(std::vector<TabletSharedPtr>* tablets, |
| std::vector<RowsetSharedPtr>* rowsets, |
| std::function<bool(const TabletSharedPtr&)> skip_tablet) { |
| std::vector<SortCtx> sort_ctx_vec; |
| std::vector<std::weak_ptr<Tablet>> candidates; |
| for_each_tablet([&](const TabletSharedPtr& tablet) { candidates.emplace_back(tablet); }, |
| filter_all_tablets); |
| auto get_cooldown_tablet = [&sort_ctx_vec, &skip_tablet](std::weak_ptr<Tablet>& t) { |
| const TabletSharedPtr& tablet = t.lock(); |
| RowsetSharedPtr rowset = nullptr; |
| if (UNLIKELY(nullptr == tablet)) { |
| return; |
| } |
| int64_t cooldown_timestamp = -1; |
| size_t file_size = -1; |
| if (!skip_tablet(tablet) && |
| (rowset = tablet->need_cooldown(&cooldown_timestamp, &file_size))) { |
| sort_ctx_vec.emplace_back(tablet, rowset, cooldown_timestamp, file_size); |
| } |
| }; |
| std::for_each(candidates.begin(), candidates.end(), get_cooldown_tablet); |
| |
| std::sort(sort_ctx_vec.begin(), sort_ctx_vec.end()); |
| |
| for (SortCtx& ctx : sort_ctx_vec) { |
| VLOG_DEBUG << "get cooldown tablet: " << ctx.tablet->tablet_id(); |
| tablets->push_back(std::move(ctx.tablet)); |
| rowsets->push_back(std::move(ctx.rowset)); |
| } |
| } |
| |
| void TabletManager::get_all_tablets_storage_format(TCheckStorageFormatResult* result) { |
| DCHECK(result != nullptr); |
| auto handler = [result](const TabletSharedPtr& tablet) { |
| if (tablet->all_beta()) { |
| result->v2_tablets.push_back(tablet->tablet_id()); |
| } else { |
| result->v1_tablets.push_back(tablet->tablet_id()); |
| } |
| }; |
| |
| for_each_tablet(handler, filter_all_tablets); |
| result->__isset.v1_tablets = true; |
| result->__isset.v2_tablets = true; |
| } |
| |
| std::set<int64_t> TabletManager::check_all_tablet_segment(bool repair) { |
| std::set<int64_t> bad_tablets; |
| std::map<int64_t, std::vector<int64_t>> repair_shard_bad_tablets; |
| auto handler = [&](const TabletSharedPtr& tablet) { |
| if (!tablet->check_all_rowset_segment()) { |
| int64_t tablet_id = tablet->tablet_id(); |
| bad_tablets.insert(tablet_id); |
| if (repair) { |
| repair_shard_bad_tablets[tablet_id & _tablets_shards_mask].push_back(tablet_id); |
| } |
| } |
| }; |
| for_each_tablet(handler, filter_all_tablets); |
| |
| for (const auto& [shard_index, shard_tablets] : repair_shard_bad_tablets) { |
| auto& tablets_shard = _tablets_shards[shard_index]; |
| auto& tablet_map = tablets_shard.tablet_map; |
| std::lock_guard<std::shared_mutex> wrlock(tablets_shard.lock); |
| for (auto tablet_id : shard_tablets) { |
| auto it = tablet_map.find(tablet_id); |
| if (it == tablet_map.end()) { |
| bad_tablets.erase(tablet_id); |
| LOG(WARNING) << "Bad tablet has be removed. tablet_id=" << tablet_id; |
| } else { |
| const auto& tablet = it->second; |
| static_cast<void>(tablet->set_tablet_state(TABLET_SHUTDOWN)); |
| tablet->save_meta(); |
| { |
| std::lock_guard<std::shared_mutex> shutdown_tablets_wrlock( |
| _shutdown_tablets_lock); |
| _shutdown_tablets.push_back(tablet); |
| } |
| LOG(WARNING) << "There are some segments lost, set tablet to shutdown state." |
| << "tablet_id=" << tablet->tablet_id() |
| << ", tablet_path=" << tablet->tablet_path(); |
| } |
| } |
| } |
| |
| return bad_tablets; |
| } |
| |
| bool TabletManager::update_tablet_partition_id(::doris::TPartitionId partition_id, |
| ::doris::TTabletId tablet_id) { |
| std::shared_lock rdlock(_get_tablets_shard_lock(tablet_id)); |
| TabletSharedPtr tablet = _get_tablet_unlocked(tablet_id); |
| if (tablet == nullptr) { |
| LOG(WARNING) << "get tablet err partition_id: " << partition_id |
| << " tablet_id:" << tablet_id; |
| return false; |
| } |
| _remove_tablet_from_partition(tablet); |
| auto st = tablet->tablet_meta()->set_partition_id(partition_id); |
| if (!st.ok()) { |
| LOG(WARNING) << "set partition id err partition_id: " << partition_id |
| << " tablet_id:" << tablet_id; |
| return false; |
| } |
| _add_tablet_to_partition(tablet); |
| return true; |
| } |
| |
| void TabletManager::get_topn_tablet_delete_bitmap_score( |
| uint64_t* max_delete_bitmap_score, uint64_t* max_base_rowset_delete_bitmap_score) { |
| int64_t max_delete_bitmap_score_tablet_id = 0; |
| int64_t max_base_rowset_delete_bitmap_score_tablet_id = 0; |
| OlapStopWatch watch; |
| uint64_t total_delete_map_count = 0; |
| int n = config::check_tablet_delete_bitmap_score_top_n; |
| std::vector<std::pair<std::shared_ptr<Tablet>, int64_t>> buf; |
| buf.reserve(n + 1); |
| auto handler = [&](const TabletSharedPtr& tablet) { |
| uint64_t delete_bitmap_count = |
| tablet->tablet_meta()->delete_bitmap().get_delete_bitmap_count(); |
| total_delete_map_count += delete_bitmap_count; |
| if (delete_bitmap_count > *max_delete_bitmap_score) { |
| max_delete_bitmap_score_tablet_id = tablet->tablet_id(); |
| *max_delete_bitmap_score = delete_bitmap_count; |
| } |
| buf.emplace_back(std::move(tablet), delete_bitmap_count); |
| std::sort(buf.begin(), buf.end(), [](auto& a, auto& b) { return a.second > b.second; }); |
| if (buf.size() > n) { |
| buf.pop_back(); |
| } |
| }; |
| for_each_tablet(handler, filter_all_tablets); |
| for (auto& [t, _] : buf) { |
| t->get_base_rowset_delete_bitmap_count(max_base_rowset_delete_bitmap_score, |
| &max_base_rowset_delete_bitmap_score_tablet_id); |
| } |
| std::stringstream ss; |
| for (auto& i : buf) { |
| ss << i.first->tablet_id() << ": " << i.second << ", "; |
| } |
| LOG(INFO) << "get_topn_tablet_delete_bitmap_score, n=" << n |
| << ", tablet size=" << _tablets_shards.size() |
| << ", total_delete_map_count=" << total_delete_map_count |
| << ", cost(us)=" << watch.get_elapse_time_us() |
| << ", max_delete_bitmap_score=" << *max_delete_bitmap_score |
| << ", max_delete_bitmap_score_tablet_id=" << max_delete_bitmap_score_tablet_id |
| << ", max_base_rowset_delete_bitmap_score=" << *max_base_rowset_delete_bitmap_score |
| << ", max_base_rowset_delete_bitmap_score_tablet_id=" |
| << max_base_rowset_delete_bitmap_score_tablet_id << ", tablets=[" << ss.str() << "]"; |
| } |
| |
| #include "common/compile_check_end.h" |
| } // end namespace doris |