| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "olap/task/engine_storage_migration_task.h" |
| |
| #include <fmt/format.h> |
| #include <gen_cpp/olap_file.pb.h> |
| #include <unistd.h> |
| |
| #include <algorithm> |
| #include <ctime> |
| #include <memory> |
| #include <mutex> |
| #include <new> |
| #include <ostream> |
| #include <set> |
| #include <utility> |
| |
| #include "common/config.h" |
| #include "common/logging.h" |
| #include "io/fs/local_file_system.h" |
| #include "olap/base_tablet.h" |
| #include "olap/data_dir.h" |
| #include "olap/olap_common.h" |
| #include "olap/olap_define.h" |
| #include "olap/pb_helper.h" |
| #include "olap/rowset/rowset_meta.h" |
| #include "olap/snapshot_manager.h" |
| #include "olap/storage_engine.h" |
| #include "olap/tablet_manager.h" |
| #include "olap/txn_manager.h" |
| #include "util/doris_metrics.h" |
| #include "util/uid_util.h" |
| |
| namespace doris { |
| #include "common/compile_check_begin.h" |
| |
| using std::stringstream; |
| |
| EngineStorageMigrationTask::EngineStorageMigrationTask(StorageEngine& engine, |
| TabletSharedPtr tablet, DataDir* dest_store) |
| : _engine(engine), _tablet(std::move(tablet)), _dest_store(dest_store) { |
| _task_start_time = time(nullptr); |
| _mem_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER, |
| "EngineStorageMigrationTask"); |
| } |
| |
| EngineStorageMigrationTask::~EngineStorageMigrationTask() = default; |
| |
| Status EngineStorageMigrationTask::execute() { |
| return _migrate(); |
| } |
| |
| Status EngineStorageMigrationTask::_get_versions(int64_t start_version, int64_t* end_version, |
| std::vector<RowsetSharedPtr>* consistent_rowsets) { |
| std::shared_lock rdlock(_tablet->get_header_lock()); |
| // check if tablet is in cooldown, we don't support migration in this case |
| if (_tablet->tablet_meta()->cooldown_meta_id().initialized()) { |
| LOG(WARNING) << "currently not support migrate tablet with cooldowned remote data. tablet=" |
| << _tablet->tablet_id(); |
| return Status::NotSupported( |
| "currently not support migrate tablet with cooldowned remote data"); |
| } |
| const RowsetSharedPtr last_version = _tablet->get_rowset_with_max_version(); |
| if (last_version == nullptr) { |
| return Status::InternalError("failed to get rowset with max version, tablet={}", |
| _tablet->tablet_id()); |
| } |
| |
| *end_version = last_version->end_version(); |
| if (*end_version < start_version) { |
| // rowsets are empty |
| VLOG_DEBUG << "consistent rowsets empty. tablet=" << _tablet->tablet_id() |
| << ", start_version=" << start_version << ", end_version=" << *end_version; |
| return Status::OK(); |
| } |
| auto ret = DORIS_TRY(_tablet->capture_consistent_rowsets_unlocked( |
| Version(start_version, *end_version), CaptureRowsetOps {})); |
| *consistent_rowsets = std::move(ret.rowsets); |
| return Status::OK(); |
| } |
| |
| bool EngineStorageMigrationTask::_is_timeout() { |
| int64_t time_elapsed = time(nullptr) - _task_start_time; |
| int64_t timeout = std::max<int64_t>(config::migration_task_timeout_secs, |
| _tablet->tablet_local_size() >> 20); |
| if (time_elapsed > timeout) { |
| LOG(WARNING) << "migration failed due to timeout, time_elapsed=" << time_elapsed |
| << ", tablet=" << _tablet->tablet_id(); |
| return true; |
| } |
| return false; |
| } |
| |
| Status EngineStorageMigrationTask::_check_running_txns() { |
| // need hold migration lock outside |
| int64_t partition_id; |
| std::set<int64_t> transaction_ids; |
| // check if this tablet has related running txns. if yes, can not do migration. |
| _engine.txn_manager()->get_tablet_related_txns(_tablet->tablet_id(), _tablet->tablet_uid(), |
| &partition_id, &transaction_ids); |
| if (!transaction_ids.empty()) { |
| return Status::Error<ErrorCode::INTERNAL_ERROR, false>("tablet {} has unfinished txns", |
| _tablet->tablet_id()); |
| } |
| return Status::OK(); |
| } |
| |
| Status EngineStorageMigrationTask::_check_running_txns_until_timeout( |
| std::unique_lock<std::shared_timed_mutex>* migration_wlock) { |
| // caller should not hold migration lock, and 'migration_wlock' should not be nullptr |
| // ownership of the migration_wlock is transferred to the caller if check succ |
| DCHECK_NE(migration_wlock, nullptr); |
| Status res = Status::OK(); |
| do { |
| // to avoid invalid loops, the lock is guaranteed to be acquired here |
| { |
| std::unique_lock<std::shared_timed_mutex> wlock(_tablet->get_migration_lock()); |
| if (_tablet->tablet_state() == TABLET_SHUTDOWN) { |
| return Status::Error<ErrorCode::INTERNAL_ERROR, false>("tablet {} has deleted", |
| _tablet->tablet_id()); |
| } |
| res = _check_running_txns(); |
| if (res.ok()) { |
| // transfer the lock to the caller |
| *migration_wlock = std::move(wlock); |
| return res; |
| } |
| } |
| std::this_thread::sleep_for(std::chrono::milliseconds(200)); |
| } while (!_is_timeout()); |
| return res; |
| } |
| |
| Status EngineStorageMigrationTask::_gen_and_write_header_to_hdr_file( |
| int32_t shard, const std::string& full_path, |
| const std::vector<RowsetSharedPtr>& consistent_rowsets, int64_t end_version) { |
| // need hold migration lock and push lock outside |
| int64_t tablet_id = _tablet->tablet_id(); |
| int32_t schema_hash = _tablet->schema_hash(); |
| TabletMetaSharedPtr new_tablet_meta(new (std::nothrow) TabletMeta()); |
| { |
| std::shared_lock rdlock(_tablet->get_header_lock()); |
| _generate_new_header(shard, consistent_rowsets, new_tablet_meta, end_version); |
| } |
| std::string new_meta_file = full_path + "/" + std::to_string(tablet_id) + ".hdr"; |
| RETURN_IF_ERROR(new_tablet_meta->save(new_meta_file)); |
| |
| // it will change rowset id and its create time |
| // rowset create time is useful when load tablet from meta to check which tablet is the tablet to load |
| _pending_rs_guards = DORIS_TRY(_engine.snapshot_mgr()->convert_rowset_ids( |
| full_path, tablet_id, _tablet->replica_id(), _tablet->table_id(), |
| _tablet->partition_id(), schema_hash)); |
| return Status::OK(); |
| } |
| |
| Status EngineStorageMigrationTask::_reload_tablet(const std::string& full_path) { |
| if (_tablet->tablet_state() == TABLET_SHUTDOWN) { |
| return Status::Error<ErrorCode::INTERNAL_ERROR, false>("tablet {} has deleted", |
| _tablet->tablet_id()); |
| } |
| // need hold migration lock and push lock outside |
| int64_t tablet_id = _tablet->tablet_id(); |
| int32_t schema_hash = _tablet->schema_hash(); |
| RETURN_IF_ERROR(_engine.tablet_manager()->load_tablet_from_dir(_dest_store, tablet_id, |
| schema_hash, full_path, false)); |
| |
| // if old tablet finished schema change, then the schema change status of the new tablet is DONE |
| // else the schema change status of the new tablet is FAILED |
| TabletSharedPtr new_tablet = _engine.tablet_manager()->get_tablet(tablet_id); |
| if (new_tablet == nullptr) { |
| return Status::NotFound("could not find tablet {}", tablet_id); |
| } |
| return Status::OK(); |
| } |
| |
| // if the size less than threshold, return true |
| bool EngineStorageMigrationTask::_is_rowsets_size_less_than_threshold( |
| const std::vector<RowsetSharedPtr>& consistent_rowsets) { |
| size_t total_size = 0; |
| for (const auto& rs : consistent_rowsets) { |
| total_size += rs->index_disk_size() + rs->data_disk_size(); |
| } |
| if (total_size < config::migration_remaining_size_threshold_mb) { |
| return true; |
| } |
| return false; |
| } |
| |
| Status EngineStorageMigrationTask::_migrate() { |
| int64_t tablet_id = _tablet->tablet_id(); |
| LOG(INFO) << "begin to process tablet migrate. " |
| << "tablet_id=" << tablet_id << ", dest_store=" << _dest_store->path(); |
| |
| RETURN_IF_ERROR(_engine.tablet_manager()->register_transition_tablet(_tablet->tablet_id(), |
| "disk migrate")); |
| Defer defer {[&]() { |
| _engine.tablet_manager()->unregister_transition_tablet(_tablet->tablet_id(), |
| "disk migrate"); |
| }}; |
| |
| DorisMetrics::instance()->storage_migrate_requests_total->increment(1); |
| int64_t start_version = 0; |
| int64_t end_version = 0; |
| std::vector<RowsetSharedPtr> consistent_rowsets; |
| |
| // During migration, if the rowsets being migrated undergoes a compaction operation, |
| // that will result in incorrect delete bitmaps after migration for mow table. Therefore, |
| // compaction will be prohibited for the mow table when migration. Moreover, it is useless |
| // to perform a compaction operation on the migration data, as the migration still migrates |
| // the data of rowsets before the compaction operation. |
| std::unique_lock base_compaction_lock(_tablet->get_base_compaction_lock(), std::defer_lock); |
| std::unique_lock cumu_compaction_lock(_tablet->get_cumulative_compaction_lock(), |
| std::defer_lock); |
| if (_tablet->enable_unique_key_merge_on_write()) { |
| base_compaction_lock.lock(); |
| cumu_compaction_lock.lock(); |
| } |
| |
| // try hold migration lock first |
| Status res; |
| int32_t shard = 0; |
| std::string full_path; |
| { |
| std::unique_lock<std::shared_timed_mutex> migration_wlock(_tablet->get_migration_lock(), |
| std::chrono::seconds(1)); |
| if (!migration_wlock.owns_lock()) { |
| return Status::InternalError("could not own migration_wlock"); |
| } |
| |
| // check if this tablet has related running txns. if yes, can not do migration. |
| RETURN_IF_ERROR(_check_running_txns()); |
| |
| std::lock_guard<std::mutex> lock(_tablet->get_push_lock()); |
| // get versions to be migrate |
| RETURN_IF_ERROR(_get_versions(start_version, &end_version, &consistent_rowsets)); |
| |
| // TODO(ygl): the tablet should not under schema change or rollup or load |
| shard = _dest_store->get_shard(); |
| |
| auto shard_path = fmt::format("{}/{}/{}", _dest_store->path(), DATA_PREFIX, shard); |
| full_path = SnapshotManager::get_schema_hash_full_path(_tablet, shard_path); |
| // if dir already exist then return err, it should not happen. |
| // should not remove the dir directly, for safety reason. |
| bool exists = true; |
| RETURN_IF_ERROR(io::global_local_filesystem()->exists(full_path, &exists)); |
| if (exists) { |
| return Status::AlreadyExist("schema hash path {} already exist, skip this path", |
| full_path); |
| } |
| RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(full_path)); |
| } |
| |
| std::vector<RowsetSharedPtr> temp_consistent_rowsets(consistent_rowsets); |
| RowsetBinlogMetasPB rowset_binlog_metas_pb; |
| do { |
| // migrate all index and data files but header file |
| res = _copy_index_and_data_files(full_path, temp_consistent_rowsets, |
| &rowset_binlog_metas_pb); |
| if (!res.ok()) { |
| break; |
| } |
| std::unique_lock<std::shared_timed_mutex> migration_wlock; |
| res = _check_running_txns_until_timeout(&migration_wlock); |
| if (!res.ok()) { |
| break; |
| } |
| std::lock_guard<std::mutex> lock(_tablet->get_push_lock()); |
| start_version = end_version; |
| // clear temp rowsets before get remaining versions |
| temp_consistent_rowsets.clear(); |
| // get remaining versions |
| res = _get_versions(end_version + 1, &end_version, &temp_consistent_rowsets); |
| if (!res.ok()) { |
| break; |
| } |
| if (start_version < end_version) { |
| // we have remaining versions to be migrated |
| consistent_rowsets.insert(consistent_rowsets.end(), temp_consistent_rowsets.begin(), |
| temp_consistent_rowsets.end()); |
| LOG(INFO) << "we have remaining versions to be migrated. start_version=" |
| << start_version << " end_version=" << end_version; |
| // if the remaining size is less than config::migration_remaining_size_threshold_mb(default 10MB), |
| // we take the lock to complete it to avoid long-term competition with other tasks |
| if (_is_rowsets_size_less_than_threshold(temp_consistent_rowsets)) { |
| // force to copy the remaining data and index |
| res = _copy_index_and_data_files(full_path, temp_consistent_rowsets, |
| &rowset_binlog_metas_pb); |
| if (!res.ok()) { |
| break; |
| } |
| } else { |
| if (_is_timeout()) { |
| res = Status::TimedOut("failed to migrate due to timeout"); |
| break; |
| } |
| // there is too much remaining data here. |
| // in order not to affect other tasks, release the lock and then copy it |
| continue; |
| } |
| } |
| |
| // save rowset binlog metas |
| if (rowset_binlog_metas_pb.rowset_binlog_metas_size() > 0) { |
| auto rowset_binlog_metas_pb_filename = |
| fmt::format("{}/rowset_binlog_metas.pb", full_path); |
| res = write_pb(rowset_binlog_metas_pb_filename, rowset_binlog_metas_pb); |
| if (!res.ok()) { |
| break; |
| } |
| } |
| |
| // generate new tablet meta and write to hdr file |
| res = _gen_and_write_header_to_hdr_file(shard, full_path, consistent_rowsets, end_version); |
| if (!res.ok()) { |
| break; |
| } |
| res = _reload_tablet(full_path); |
| if (!res.ok()) { |
| break; |
| } |
| |
| break; |
| } while (true); |
| |
| if (!res.ok()) { |
| // we should remove the dir directly for avoid disk full of junk data, and it's safe to remove |
| RETURN_IF_ERROR(io::global_local_filesystem()->delete_directory(full_path)); |
| RETURN_IF_ERROR(DataDir::delete_tablet_parent_path_if_empty(full_path)); |
| } |
| return res; |
| } |
| |
| // TODO(ygl): lost some information here, such as cumulative layer point |
| void EngineStorageMigrationTask::_generate_new_header( |
| int32_t new_shard, const std::vector<RowsetSharedPtr>& consistent_rowsets, |
| TabletMetaSharedPtr new_tablet_meta, int64_t end_version) { |
| _tablet->generate_tablet_meta_copy_unlocked(*new_tablet_meta, false); |
| |
| std::vector<RowsetMetaSharedPtr> rs_metas; |
| for (auto& rs : consistent_rowsets) { |
| rs_metas.push_back(rs->rowset_meta()); |
| } |
| new_tablet_meta->revise_rs_metas(std::move(rs_metas)); |
| if (_tablet->keys_type() == UNIQUE_KEYS && _tablet->enable_unique_key_merge_on_write()) { |
| DeleteBitmap bm = _tablet->tablet_meta()->delete_bitmap().snapshot(end_version); |
| new_tablet_meta->revise_delete_bitmap_unlocked(bm); |
| } |
| new_tablet_meta->set_shard_id(new_shard); |
| // should not save new meta here, because new tablet may failed |
| // should not remove the old meta here, because the new header maybe not valid |
| // remove old meta after the new tablet is loaded successfully |
| } |
| |
| Status EngineStorageMigrationTask::_copy_index_and_data_files( |
| const std::string& full_path, const std::vector<RowsetSharedPtr>& consistent_rowsets, |
| RowsetBinlogMetasPB* all_binlog_metas_pb) const { |
| RowsetBinlogMetasPB rowset_binlog_metas_pb; |
| for (const auto& rs : consistent_rowsets) { |
| RETURN_IF_ERROR(rs->copy_files_to(full_path, rs->rowset_id())); |
| |
| Version binlog_versions = rs->version(); |
| RETURN_IF_ERROR(_tablet->get_rowset_binlog_metas(binlog_versions, &rowset_binlog_metas_pb)); |
| } |
| |
| // copy index binlog files. |
| for (const auto& rowset_binlog_meta : rowset_binlog_metas_pb.rowset_binlog_metas()) { |
| auto num_segments = rowset_binlog_meta.num_segments(); |
| std::string_view rowset_id = rowset_binlog_meta.rowset_id(); |
| |
| RowsetMetaPB rowset_meta_pb; |
| if (!rowset_meta_pb.ParseFromString(rowset_binlog_meta.data())) { |
| auto err_msg = fmt::format("fail to parse binlog meta data value:{}", |
| rowset_binlog_meta.data()); |
| LOG(WARNING) << err_msg; |
| return Status::InternalError(err_msg); |
| } |
| const auto& tablet_schema_pb = rowset_meta_pb.tablet_schema(); |
| TabletSchema tablet_schema; |
| tablet_schema.init_from_pb(tablet_schema_pb); |
| |
| // copy segment files and index files |
| for (int64_t segment_index = 0; segment_index < num_segments; ++segment_index) { |
| std::string segment_file_path = _tablet->get_segment_filepath(rowset_id, segment_index); |
| auto snapshot_segment_file_path = |
| fmt::format("{}/{}_{}.binlog", full_path, rowset_id, segment_index); |
| |
| Status status = io::global_local_filesystem()->copy_path(segment_file_path, |
| snapshot_segment_file_path); |
| if (!status.ok()) { |
| LOG(WARNING) << "fail to copy binlog segment file. [src=" << segment_file_path |
| << ", dest=" << snapshot_segment_file_path << "]" << status; |
| return status; |
| } |
| VLOG_DEBUG << "copy " << segment_file_path << " to " << snapshot_segment_file_path; |
| |
| if (tablet_schema.get_inverted_index_storage_format() == |
| InvertedIndexStorageFormatPB::V1) { |
| for (const auto& index : tablet_schema.inverted_indexes()) { |
| auto index_id = index->index_id(); |
| auto index_file = InvertedIndexDescriptor::get_index_file_path_v1( |
| InvertedIndexDescriptor::get_index_file_path_prefix(segment_file_path), |
| index_id, index->get_index_suffix()); |
| auto snapshot_segment_index_file_path = |
| fmt::format("{}/{}_{}_{}.binlog-index", full_path, rowset_id, |
| segment_index, index_id); |
| VLOG_DEBUG << "copy " << index_file << " to " |
| << snapshot_segment_index_file_path; |
| status = io::global_local_filesystem()->copy_path( |
| index_file, snapshot_segment_index_file_path); |
| if (!status.ok()) { |
| LOG(WARNING) |
| << "fail to copy binlog index file. [src=" << index_file |
| << ", dest=" << snapshot_segment_index_file_path << "]" << status; |
| return status; |
| } |
| } |
| } else if (tablet_schema.has_inverted_index() || tablet_schema.has_ann_index()) { |
| auto index_file = InvertedIndexDescriptor::get_index_file_path_v2( |
| InvertedIndexDescriptor::get_index_file_path_prefix(segment_file_path)); |
| auto snapshot_segment_index_file_path = |
| fmt::format("{}/{}_{}.binlog-index", full_path, rowset_id, segment_index); |
| VLOG_DEBUG << "copy " << index_file << " to " << snapshot_segment_index_file_path; |
| status = io::global_local_filesystem()->copy_path(index_file, |
| snapshot_segment_index_file_path); |
| if (!status.ok()) { |
| LOG(WARNING) << "fail to copy binlog index file. [src=" << index_file |
| << ", dest=" << snapshot_segment_index_file_path << "]" << status; |
| return status; |
| } |
| } |
| } |
| } |
| |
| std::move(rowset_binlog_metas_pb.mutable_rowset_binlog_metas()->begin(), |
| rowset_binlog_metas_pb.mutable_rowset_binlog_metas()->end(), |
| google::protobuf::RepeatedFieldBackInserter( |
| all_binlog_metas_pb->mutable_rowset_binlog_metas())); |
| |
| return Status::OK(); |
| } |
| |
| #include "common/compile_check_end.h" |
| } // namespace doris |