| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "olap/rowset/beta_rowset.h" |
| |
| #include <crc32c/crc32c.h> |
| #include <ctype.h> |
| #include <errno.h> |
| #include <fmt/format.h> |
| |
| #include <algorithm> |
| #include <filesystem> |
| #include <memory> |
| #include <ostream> |
| #include <utility> |
| |
| #include "beta_rowset.h" |
| #include "common/config.h" |
| #include "common/logging.h" |
| #include "common/status.h" |
| #include "io/fs/file_reader.h" |
| #include "io/fs/file_system.h" |
| #include "io/fs/local_file_system.h" |
| #include "io/fs/path.h" |
| #include "io/fs/remote_file_system.h" |
| #include "olap/olap_common.h" |
| #include "olap/olap_define.h" |
| #include "olap/rowset/beta_rowset_reader.h" |
| #include "olap/rowset/rowset.h" |
| #include "olap/rowset/segment_v2/index_file_reader.h" |
| #include "olap/rowset/segment_v2/inverted_index_cache.h" |
| #include "olap/rowset/segment_v2/inverted_index_desc.h" |
| #include "olap/segment_loader.h" |
| #include "olap/tablet_schema.h" |
| #include "olap/utils.h" |
| #include "util/debug_points.h" |
| #include "util/doris_metrics.h" |
| |
| namespace doris { |
| #include "common/compile_check_begin.h" |
| using namespace ErrorCode; |
| |
| std::string BetaRowset::local_segment_path_segcompacted(const std::string& tablet_path, |
| const RowsetId& rowset_id, int64_t begin, |
| int64_t end) { |
| // {root_path}/data/{shard_id}/{tablet_id}/{schema_hash}/{rowset_id}_{begin_seg}-{end_seg}.dat |
| return fmt::format("{}/{}_{}-{}.dat", tablet_path, rowset_id.to_string(), begin, end); |
| } |
| |
| BetaRowset::BetaRowset(const TabletSchemaSPtr& schema, const RowsetMetaSharedPtr& rowset_meta, |
| std::string tablet_path) |
| : Rowset(schema, rowset_meta, std::move(tablet_path)) {} |
| |
| BetaRowset::~BetaRowset() = default; |
| |
| Status BetaRowset::init() { |
| return Status::OK(); // no op |
| } |
| |
| Status BetaRowset::get_segment_num_rows(std::vector<uint32_t>* segment_rows, |
| OlapReaderStatistics* read_stats) { |
| // `ROWSET_UNLOADING` is state for closed() called but owned by some readers. |
| // So here `ROWSET_UNLOADING` is allowed. |
| DCHECK_NE(_rowset_state_machine.rowset_state(), ROWSET_UNLOADED); |
| |
| RETURN_IF_ERROR(_load_segment_rows_once.call([this, read_stats] { |
| auto segment_count = num_segments(); |
| _segments_rows.resize(segment_count); |
| for (int64_t i = 0; i != segment_count; ++i) { |
| SegmentCacheHandle segment_cache_handle; |
| RETURN_IF_ERROR(SegmentLoader::instance()->load_segment( |
| std::static_pointer_cast<BetaRowset>(shared_from_this()), i, |
| &segment_cache_handle, false, false, read_stats)); |
| const auto& tmp_segments = segment_cache_handle.get_segments(); |
| _segments_rows[i] = tmp_segments[0]->num_rows(); |
| } |
| return Status::OK(); |
| })); |
| segment_rows->assign(_segments_rows.cbegin(), _segments_rows.cend()); |
| return Status::OK(); |
| } |
| |
| Status BetaRowset::get_inverted_index_size(int64_t* index_size) { |
| const auto& fs = _rowset_meta->fs(); |
| if (!fs) { |
| return Status::Error<INIT_FAILED>("get fs failed, resource_id={}", |
| _rowset_meta->resource_id()); |
| } |
| |
| if (_schema->get_inverted_index_storage_format() == InvertedIndexStorageFormatPB::V1) { |
| for (const auto& index : _schema->inverted_indexes()) { |
| for (int seg_id = 0; seg_id < num_segments(); ++seg_id) { |
| auto seg_path = DORIS_TRY(segment_path(seg_id)); |
| int64_t file_size = 0; |
| |
| std::string inverted_index_file_path = |
| InvertedIndexDescriptor::get_index_file_path_v1( |
| InvertedIndexDescriptor::get_index_file_path_prefix(seg_path), |
| index->index_id(), index->get_index_suffix()); |
| RETURN_IF_ERROR(fs->file_size(inverted_index_file_path, &file_size)); |
| *index_size += file_size; |
| } |
| } |
| } else { |
| for (int seg_id = 0; seg_id < num_segments(); ++seg_id) { |
| auto seg_path = DORIS_TRY(segment_path(seg_id)); |
| int64_t file_size = 0; |
| |
| std::string inverted_index_file_path = InvertedIndexDescriptor::get_index_file_path_v2( |
| InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)); |
| RETURN_IF_ERROR(fs->file_size(inverted_index_file_path, &file_size)); |
| *index_size += file_size; |
| } |
| } |
| return Status::OK(); |
| } |
| |
| void BetaRowset::clear_inverted_index_cache() { |
| for (int i = 0; i < num_segments(); ++i) { |
| auto seg_path = segment_path(i); |
| if (!seg_path) { |
| continue; |
| } |
| |
| auto index_path_prefix = InvertedIndexDescriptor::get_index_file_path_prefix(*seg_path); |
| for (const auto& column : tablet_schema()->columns()) { |
| auto index_metas = tablet_schema()->inverted_indexs(*column); |
| for (const auto& index_meta : index_metas) { |
| auto inverted_index_file_cache_key = |
| InvertedIndexDescriptor::get_index_file_cache_key( |
| index_path_prefix, index_meta->index_id(), |
| index_meta->get_index_suffix()); |
| (void)segment_v2::InvertedIndexSearcherCache::instance()->erase( |
| inverted_index_file_cache_key); |
| } |
| } |
| } |
| } |
| |
| Status BetaRowset::get_segments_size(std::vector<size_t>* segments_size) { |
| auto fs = _rowset_meta->fs(); |
| if (!fs) { |
| return Status::Error<INIT_FAILED>("get fs failed, resource_id={}", |
| _rowset_meta->resource_id()); |
| } |
| |
| for (int seg_id = 0; seg_id < num_segments(); ++seg_id) { |
| auto seg_path = DORIS_TRY(segment_path(seg_id)); |
| int64_t file_size; |
| RETURN_IF_ERROR(fs->file_size(seg_path, &file_size)); |
| segments_size->push_back(file_size); |
| } |
| return Status::OK(); |
| } |
| |
| Status BetaRowset::load_segments(std::vector<segment_v2::SegmentSharedPtr>* segments) { |
| return load_segments(0, num_segments(), segments); |
| } |
| |
| Status BetaRowset::load_segments(int64_t seg_id_begin, int64_t seg_id_end, |
| std::vector<segment_v2::SegmentSharedPtr>* segments) { |
| int64_t seg_id = seg_id_begin; |
| while (seg_id < seg_id_end) { |
| std::shared_ptr<segment_v2::Segment> segment; |
| RETURN_IF_ERROR(load_segment(seg_id, nullptr, &segment)); |
| segments->push_back(std::move(segment)); |
| seg_id++; |
| } |
| return Status::OK(); |
| } |
| |
| Status BetaRowset::load_segment(int64_t seg_id, OlapReaderStatistics* stats, |
| segment_v2::SegmentSharedPtr* segment) { |
| auto fs = _rowset_meta->fs(); |
| if (!fs) { |
| return Status::Error<INIT_FAILED>("get fs failed"); |
| } |
| |
| DCHECK(seg_id >= 0); |
| auto seg_path = DORIS_TRY(segment_path(seg_id)); |
| io::FileReaderOptions reader_options { |
| .cache_type = config::enable_file_cache ? io::FileCachePolicy::FILE_BLOCK_CACHE |
| : io::FileCachePolicy::NO_CACHE, |
| .is_doris_table = true, |
| .cache_base_path = "", |
| .file_size = _rowset_meta->segment_file_size(static_cast<int>(seg_id)), |
| .tablet_id = _rowset_meta->tablet_id(), |
| }; |
| |
| auto s = segment_v2::Segment::open( |
| fs, seg_path, _rowset_meta->tablet_id(), static_cast<uint32_t>(seg_id), rowset_id(), |
| _schema, reader_options, segment, |
| _rowset_meta->inverted_index_file_info(static_cast<int>(seg_id)), stats); |
| if (!s.ok()) { |
| LOG(WARNING) << "failed to open segment. " << seg_path << " under rowset " << rowset_id() |
| << " : " << s.to_string(); |
| return s; |
| } |
| return Status::OK(); |
| } |
| |
| Status BetaRowset::create_reader(RowsetReaderSharedPtr* result) { |
| // NOTE: We use std::static_pointer_cast for performance |
| result->reset(new BetaRowsetReader(std::static_pointer_cast<BetaRowset>(shared_from_this()))); |
| return Status::OK(); |
| } |
| |
| Status BetaRowset::remove() { |
| if (!is_local()) { |
| DCHECK(false) << _rowset_meta->tablet_id() << ' ' << rowset_id(); |
| return Status::OK(); |
| } |
| |
| // TODO should we close and remove all segment reader first? |
| VLOG_NOTICE << "begin to remove files in rowset " << rowset_id() |
| << ", version:" << start_version() << "-" << end_version() |
| << ", tabletid:" << _rowset_meta->tablet_id(); |
| // If the rowset was removed, it need to remove the fds in segment cache directly |
| clear_cache(); |
| |
| bool success = true; |
| Status st; |
| const auto& fs = io::global_local_filesystem(); |
| for (int i = 0; i < num_segments(); ++i) { |
| auto seg_path = local_segment_path(_tablet_path, rowset_id().to_string(), i); |
| LOG(INFO) << "deleting " << seg_path; |
| st = fs->delete_file(seg_path); |
| if (!st.ok()) { |
| LOG(WARNING) << st.to_string(); |
| success = false; |
| } |
| |
| if (_schema->get_inverted_index_storage_format() == InvertedIndexStorageFormatPB::V1) { |
| for (const auto& column : _schema->columns()) { |
| auto index_metas = _schema->inverted_indexs(*column); |
| for (const auto& index_meta : index_metas) { |
| std::string inverted_index_file = |
| InvertedIndexDescriptor::get_index_file_path_v1( |
| InvertedIndexDescriptor::get_index_file_path_prefix(seg_path), |
| index_meta->index_id(), index_meta->get_index_suffix()); |
| st = fs->delete_file(inverted_index_file); |
| if (!st.ok()) { |
| LOG(WARNING) << st.to_string(); |
| success = false; |
| } |
| } |
| } |
| } else { |
| if (_schema->has_inverted_index() || _schema->has_ann_index()) { |
| std::string inverted_index_file = InvertedIndexDescriptor::get_index_file_path_v2( |
| InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)); |
| st = fs->delete_file(inverted_index_file); |
| if (!st.ok()) { |
| LOG(WARNING) << st.to_string(); |
| success = false; |
| } |
| } |
| } |
| } |
| if (!success) { |
| return Status::Error<ROWSET_DELETE_FILE_FAILED>("failed to remove files in rowset {}", |
| rowset_id().to_string()); |
| } |
| return Status::OK(); |
| } |
| |
| void BetaRowset::do_close() { |
| // do nothing. |
| } |
| |
| Status BetaRowset::link_files_to(const std::string& dir, RowsetId new_rowset_id, |
| size_t new_rowset_start_seg_id, |
| std::set<int64_t>* without_index_uids) { |
| if (!is_local()) { |
| DCHECK(false) << _rowset_meta->tablet_id() << ' ' << rowset_id(); |
| return Status::InternalError("should be local rowset. tablet_id={} rowset_id={}", |
| _rowset_meta->tablet_id(), rowset_id().to_string()); |
| } |
| |
| const auto& local_fs = io::global_local_filesystem(); |
| Status status; |
| std::vector<std::string> linked_success_files; |
| Defer remove_linked_files {[&]() { // clear linked files if errors happen |
| if (!status.ok()) { |
| LOG(WARNING) << "will delete linked success files due to error " << status; |
| std::vector<io::Path> paths; |
| for (auto& file : linked_success_files) { |
| paths.emplace_back(file); |
| LOG(WARNING) << "will delete linked success file " << file << " due to error"; |
| } |
| static_cast<void>(local_fs->batch_delete(paths)); |
| LOG(WARNING) << "done delete linked success files due to error " << status; |
| } |
| }}; |
| |
| for (int i = 0; i < num_segments(); ++i) { |
| auto dst_path = |
| local_segment_path(dir, new_rowset_id.to_string(), i + new_rowset_start_seg_id); |
| bool dst_path_exist = false; |
| if (!local_fs->exists(dst_path, &dst_path_exist).ok() || dst_path_exist) { |
| status = Status::Error<FILE_ALREADY_EXIST>( |
| "failed to create hard link, file already exist: {}", dst_path); |
| return status; |
| } |
| auto src_path = local_segment_path(_tablet_path, rowset_id().to_string(), i); |
| // TODO(lingbin): how external storage support link? |
| // use copy? or keep refcount to avoid being delete? |
| if (!local_fs->link_file(src_path, dst_path).ok()) { |
| status = Status::Error<OS_ERROR>("fail to create hard link. from={}, to={}, errno={}", |
| src_path, dst_path, Errno::no()); |
| return status; |
| } |
| linked_success_files.push_back(dst_path); |
| DBUG_EXECUTE_IF("fault_inject::BetaRowset::link_files_to::_link_inverted_index_file", { |
| status = Status::Error<OS_ERROR>("fault_inject link_file error"); |
| return status; |
| }); |
| if (_schema->get_inverted_index_storage_format() == InvertedIndexStorageFormatPB::V1) { |
| for (const auto& index : _schema->inverted_indexes()) { |
| auto index_id = index->index_id(); |
| if (without_index_uids != nullptr && without_index_uids->count(index_id)) { |
| continue; |
| } |
| std::string inverted_index_src_file_path = |
| InvertedIndexDescriptor::get_index_file_path_v1( |
| InvertedIndexDescriptor::get_index_file_path_prefix(src_path), |
| index_id, index->get_index_suffix()); |
| std::string inverted_index_dst_file_path = |
| InvertedIndexDescriptor::get_index_file_path_v1( |
| InvertedIndexDescriptor::get_index_file_path_prefix(dst_path), |
| index_id, index->get_index_suffix()); |
| bool index_file_exists = true; |
| RETURN_IF_ERROR(local_fs->exists(inverted_index_src_file_path, &index_file_exists)); |
| if (index_file_exists) { |
| DBUG_EXECUTE_IF( |
| "fault_inject::BetaRowset::link_files_to::_link_inverted_index_file", { |
| status = Status::Error<OS_ERROR>( |
| "fault_inject link_file error from={}, to={}", |
| inverted_index_src_file_path, inverted_index_dst_file_path); |
| return status; |
| }); |
| if (!local_fs->link_file(inverted_index_src_file_path, |
| inverted_index_dst_file_path) |
| .ok()) { |
| status = Status::Error<OS_ERROR>( |
| "fail to create hard link. from={}, to={}, errno={}", |
| inverted_index_src_file_path, inverted_index_dst_file_path, |
| Errno::no()); |
| return status; |
| } |
| linked_success_files.push_back(inverted_index_dst_file_path); |
| LOG(INFO) << "success to create hard link. from=" |
| << inverted_index_src_file_path << ", " |
| << "to=" << inverted_index_dst_file_path; |
| } else { |
| LOG(WARNING) << "skip create hard link to not existed index file=" |
| << inverted_index_src_file_path; |
| } |
| } |
| } else { |
| if ((_schema->has_inverted_index() || _schema->has_ann_index()) && |
| (without_index_uids == nullptr || without_index_uids->empty())) { |
| std::string inverted_index_file_src = |
| InvertedIndexDescriptor::get_index_file_path_v2( |
| InvertedIndexDescriptor::get_index_file_path_prefix(src_path)); |
| std::string inverted_index_file_dst = |
| InvertedIndexDescriptor::get_index_file_path_v2( |
| InvertedIndexDescriptor::get_index_file_path_prefix(dst_path)); |
| bool index_dst_path_exist = false; |
| |
| if (!local_fs->exists(inverted_index_file_dst, &index_dst_path_exist).ok() || |
| index_dst_path_exist) { |
| status = Status::Error<FILE_ALREADY_EXIST>( |
| "failed to create hard link, file already exist: {}", |
| inverted_index_file_dst); |
| return status; |
| } |
| if (!local_fs->link_file(inverted_index_file_src, inverted_index_file_dst).ok()) { |
| status = Status::Error<OS_ERROR>( |
| "fail to create hard link. from={}, to={}, errno={}", |
| inverted_index_file_src, inverted_index_file_dst, Errno::no()); |
| return status; |
| } |
| linked_success_files.push_back(inverted_index_file_dst); |
| } |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status BetaRowset::copy_files_to(const std::string& dir, const RowsetId& new_rowset_id) { |
| if (!is_local()) { |
| DCHECK(false) << _rowset_meta->tablet_id() << ' ' << rowset_id(); |
| return Status::InternalError("should be local rowset. tablet_id={} rowset_id={}", |
| _rowset_meta->tablet_id(), rowset_id().to_string()); |
| } |
| |
| bool exists = false; |
| for (int i = 0; i < num_segments(); ++i) { |
| auto dst_path = local_segment_path(dir, new_rowset_id.to_string(), i); |
| RETURN_IF_ERROR(io::global_local_filesystem()->exists(dst_path, &exists)); |
| if (exists) { |
| return Status::Error<FILE_ALREADY_EXIST>("file already exist: {}", dst_path); |
| } |
| auto src_path = local_segment_path(_tablet_path, rowset_id().to_string(), i); |
| RETURN_IF_ERROR(io::global_local_filesystem()->copy_path(src_path, dst_path)); |
| if (_schema->get_inverted_index_storage_format() == InvertedIndexStorageFormatPB::V1) { |
| for (const auto& column : _schema->columns()) { |
| // if (column.has_inverted_index()) { |
| auto index_metas = _schema->inverted_indexs(*column); |
| for (const auto& index_meta : index_metas) { |
| std::string inverted_index_src_file_path = |
| InvertedIndexDescriptor::get_index_file_path_v1( |
| InvertedIndexDescriptor::get_index_file_path_prefix(src_path), |
| index_meta->index_id(), index_meta->get_index_suffix()); |
| std::string inverted_index_dst_file_path = |
| InvertedIndexDescriptor::get_index_file_path_v1( |
| InvertedIndexDescriptor::get_index_file_path_prefix(dst_path), |
| index_meta->index_id(), index_meta->get_index_suffix()); |
| RETURN_IF_ERROR(io::global_local_filesystem()->copy_path( |
| inverted_index_src_file_path, inverted_index_dst_file_path)); |
| LOG(INFO) << "success to copy file. from=" << inverted_index_src_file_path |
| << ", " |
| << "to=" << inverted_index_dst_file_path; |
| } |
| } |
| } else { |
| if (_schema->has_inverted_index() || _schema->has_ann_index()) { |
| std::string inverted_index_src_file = |
| InvertedIndexDescriptor::get_index_file_path_v2( |
| InvertedIndexDescriptor::get_index_file_path_prefix(src_path)); |
| std::string inverted_index_dst_file = |
| InvertedIndexDescriptor::get_index_file_path_v2( |
| InvertedIndexDescriptor::get_index_file_path_prefix(dst_path)); |
| RETURN_IF_ERROR(io::global_local_filesystem()->copy_path(inverted_index_src_file, |
| inverted_index_dst_file)); |
| LOG(INFO) << "success to copy file. from=" << inverted_index_src_file << ", " |
| << "to=" << inverted_index_dst_file; |
| } |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status BetaRowset::upload_to(const StorageResource& dest_fs, const RowsetId& new_rowset_id) { |
| if (!is_local()) { |
| DCHECK(false) << _rowset_meta->tablet_id() << ' ' << rowset_id(); |
| return Status::InternalError("should be local rowset. tablet_id={} rowset_id={}", |
| _rowset_meta->tablet_id(), rowset_id().to_string()); |
| } |
| |
| if (num_segments() < 1) { |
| return Status::OK(); |
| } |
| std::vector<io::Path> local_paths; |
| local_paths.reserve(num_segments()); |
| std::vector<io::Path> dest_paths; |
| dest_paths.reserve(num_segments()); |
| for (int i = 0; i < num_segments(); ++i) { |
| // Note: Here we use relative path for remote. |
| auto remote_seg_path = dest_fs.remote_segment_path(_rowset_meta->tablet_id(), |
| new_rowset_id.to_string(), i); |
| auto local_seg_path = local_segment_path(_tablet_path, rowset_id().to_string(), i); |
| dest_paths.emplace_back(remote_seg_path); |
| local_paths.emplace_back(local_seg_path); |
| if (_schema->get_inverted_index_storage_format() == InvertedIndexStorageFormatPB::V1) { |
| for (const auto& column : _schema->columns()) { |
| // if (column.has_inverted_index()) { |
| auto index_metas = _schema->inverted_indexs(*column); |
| for (const auto& index_meta : index_metas) { |
| std::string remote_inverted_index_file = |
| InvertedIndexDescriptor::get_index_file_path_v1( |
| InvertedIndexDescriptor::get_index_file_path_prefix( |
| remote_seg_path), |
| index_meta->index_id(), index_meta->get_index_suffix()); |
| std::string local_inverted_index_file = |
| InvertedIndexDescriptor::get_index_file_path_v1( |
| InvertedIndexDescriptor::get_index_file_path_prefix( |
| local_seg_path), |
| index_meta->index_id(), index_meta->get_index_suffix()); |
| dest_paths.emplace_back(remote_inverted_index_file); |
| local_paths.emplace_back(local_inverted_index_file); |
| } |
| } |
| } else { |
| if (_schema->has_inverted_index() || _schema->has_ann_index()) { |
| std::string remote_inverted_index_file = |
| InvertedIndexDescriptor::get_index_file_path_v2( |
| InvertedIndexDescriptor::get_index_file_path_prefix( |
| remote_seg_path)); |
| std::string local_inverted_index_file = |
| InvertedIndexDescriptor::get_index_file_path_v2( |
| InvertedIndexDescriptor::get_index_file_path_prefix( |
| local_seg_path)); |
| dest_paths.emplace_back(remote_inverted_index_file); |
| local_paths.emplace_back(local_inverted_index_file); |
| } |
| } |
| } |
| auto st = dest_fs.fs->batch_upload(local_paths, dest_paths); |
| if (st.ok()) { |
| DorisMetrics::instance()->upload_rowset_count->increment(1); |
| DorisMetrics::instance()->upload_total_byte->increment(total_disk_size()); |
| } else { |
| DorisMetrics::instance()->upload_fail_count->increment(1); |
| } |
| return st; |
| } |
| |
| Status BetaRowset::check_file_exist() { |
| const auto& fs = _rowset_meta->fs(); |
| if (!fs) { |
| return Status::InternalError("fs is not initialized, resource_id={}", |
| _rowset_meta->resource_id()); |
| } |
| |
| for (int i = 0; i < num_segments(); ++i) { |
| auto seg_path = DORIS_TRY(segment_path(i)); |
| bool seg_file_exist = false; |
| RETURN_IF_ERROR(fs->exists(seg_path, &seg_file_exist)); |
| if (!seg_file_exist) { |
| return Status::InternalError("data file not existed: {}, rowset_id={}", seg_path, |
| rowset_id().to_string()); |
| } |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status BetaRowset::check_current_rowset_segment() { |
| const auto& fs = _rowset_meta->fs(); |
| if (!fs) { |
| return Status::InternalError("fs is not initialized, resource_id={}", |
| _rowset_meta->resource_id()); |
| } |
| |
| for (int seg_id = 0; seg_id < num_segments(); ++seg_id) { |
| auto seg_path = DORIS_TRY(segment_path(seg_id)); |
| |
| std::shared_ptr<segment_v2::Segment> segment; |
| io::FileReaderOptions reader_options { |
| .cache_type = config::enable_file_cache ? io::FileCachePolicy::FILE_BLOCK_CACHE |
| : io::FileCachePolicy::NO_CACHE, |
| .is_doris_table = true, |
| .cache_base_path {}, |
| .file_size = _rowset_meta->segment_file_size(seg_id), |
| }; |
| |
| auto s = segment_v2::Segment::open(fs, seg_path, _rowset_meta->tablet_id(), seg_id, |
| rowset_id(), _schema, reader_options, &segment, |
| _rowset_meta->inverted_index_file_info(seg_id)); |
| if (!s.ok()) { |
| LOG(WARNING) << "segment can not be opened. file=" << seg_path; |
| return s; |
| } |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status BetaRowset::add_to_binlog() { |
| // FIXME(Drogon): not only local file system |
| if (!is_local()) { |
| DCHECK(false) << _rowset_meta->tablet_id() << ' ' << rowset_id(); |
| return Status::InternalError("should be local rowset. tablet_id={} rowset_id={}", |
| _rowset_meta->tablet_id(), rowset_id().to_string()); |
| } |
| |
| const auto& fs = io::global_local_filesystem(); |
| auto segments_num = num_segments(); |
| VLOG_DEBUG << fmt::format("add rowset to binlog. rowset_id={}, segments_num={}", |
| rowset_id().to_string(), segments_num); |
| |
| Status status; |
| std::vector<std::string> linked_success_files; |
| Defer remove_linked_files {[&]() { // clear linked files if errors happen |
| if (!status.ok()) { |
| LOG(WARNING) << "will delete linked success files due to error " |
| << status.to_string_no_stack(); |
| std::vector<io::Path> paths; |
| for (auto& file : linked_success_files) { |
| paths.emplace_back(file); |
| LOG(WARNING) << "will delete linked success file " << file << " due to error"; |
| } |
| static_cast<void>(fs->batch_delete(paths)); |
| LOG(WARNING) << "done delete linked success files due to error " |
| << status.to_string_no_stack(); |
| } |
| }}; |
| |
| // The publish_txn might fail even if the add_to_binlog success, so we need to check |
| // whether a file already exists before linking. |
| auto errno_is_file_exists = []() { return Errno::no() == EEXIST; }; |
| |
| // all segments are in the same directory, so cache binlog_dir without multi times check |
| std::string binlog_dir; |
| for (int i = 0; i < segments_num; ++i) { |
| auto seg_file = local_segment_path(_tablet_path, rowset_id().to_string(), i); |
| |
| if (binlog_dir.empty()) { |
| binlog_dir = std::filesystem::path(seg_file).parent_path().append("_binlog").string(); |
| |
| bool exists = true; |
| RETURN_IF_ERROR(fs->exists(binlog_dir, &exists)); |
| if (!exists) { |
| RETURN_IF_ERROR(fs->create_directory(binlog_dir)); |
| } |
| } |
| |
| auto binlog_file = |
| (std::filesystem::path(binlog_dir) / std::filesystem::path(seg_file).filename()) |
| .string(); |
| VLOG_DEBUG << "link " << seg_file << " to " << binlog_file; |
| if (!fs->link_file(seg_file, binlog_file).ok() && !errno_is_file_exists()) { |
| status = Status::Error<OS_ERROR>("fail to create hard link. from={}, to={}, errno={}", |
| seg_file, binlog_file, Errno::no()); |
| return status; |
| } |
| linked_success_files.push_back(binlog_file); |
| |
| if (_schema->get_inverted_index_storage_format() == InvertedIndexStorageFormatPB::V1) { |
| for (const auto& index : _schema->inverted_indexes()) { |
| auto index_id = index->index_id(); |
| auto index_file = InvertedIndexDescriptor::get_index_file_path_v1( |
| InvertedIndexDescriptor::get_index_file_path_prefix(seg_file), index_id, |
| index->get_index_suffix()); |
| auto binlog_index_file = (std::filesystem::path(binlog_dir) / |
| std::filesystem::path(index_file).filename()) |
| .string(); |
| VLOG_DEBUG << "link " << index_file << " to " << binlog_index_file; |
| if (!fs->link_file(index_file, binlog_index_file).ok() && !errno_is_file_exists()) { |
| status = Status::Error<OS_ERROR>( |
| "fail to create hard link. from={}, to={}, errno={}", index_file, |
| binlog_index_file, Errno::no()); |
| return status; |
| } |
| linked_success_files.push_back(binlog_index_file); |
| } |
| } else { |
| if (_schema->has_inverted_index() || _schema->has_ann_index()) { |
| auto index_file = InvertedIndexDescriptor::get_index_file_path_v2( |
| InvertedIndexDescriptor::get_index_file_path_prefix(seg_file)); |
| auto binlog_index_file = (std::filesystem::path(binlog_dir) / |
| std::filesystem::path(index_file).filename()) |
| .string(); |
| VLOG_DEBUG << "link " << index_file << " to " << binlog_index_file; |
| if (!fs->link_file(index_file, binlog_index_file).ok() && !errno_is_file_exists()) { |
| status = Status::Error<OS_ERROR>( |
| "fail to create hard link. from={}, to={}, errno={}", index_file, |
| binlog_index_file, Errno::no()); |
| return status; |
| } |
| linked_success_files.push_back(binlog_index_file); |
| } |
| } |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status BetaRowset::calc_file_crc(uint32_t* crc_value, int64_t* file_count) { |
| const auto& fs = _rowset_meta->fs(); |
| DBUG_EXECUTE_IF("fault_inject::BetaRowset::calc_file_crc", |
| { return Status::Error<OS_ERROR>("fault_inject calc_file_crc error"); }); |
| if (num_segments() < 1) { |
| *crc_value = 0x92a8fc17; // magic code from crc32c table |
| return Status::OK(); |
| } |
| |
| // 1. pick up all the files including dat file and idx file |
| std::vector<io::Path> file_paths; |
| for (int seg_id = 0; seg_id < num_segments(); ++seg_id) { |
| auto seg_path = DORIS_TRY(segment_path(seg_id)); |
| file_paths.emplace_back(seg_path); |
| if (_schema->get_inverted_index_storage_format() == InvertedIndexStorageFormatPB::V1) { |
| for (const auto& column : _schema->columns()) { |
| auto index_metas = _schema->inverted_indexs(*column); |
| for (const auto& index_meta : index_metas) { |
| std::string inverted_index_file = |
| InvertedIndexDescriptor::get_index_file_path_v1( |
| InvertedIndexDescriptor::get_index_file_path_prefix(seg_path), |
| index_meta->index_id(), index_meta->get_index_suffix()); |
| file_paths.emplace_back(std::move(inverted_index_file)); |
| } |
| } |
| } else { |
| if (_schema->has_inverted_index() || _schema->has_ann_index()) { |
| std::string inverted_index_file = InvertedIndexDescriptor::get_index_file_path_v2( |
| InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)); |
| file_paths.emplace_back(std::move(inverted_index_file)); |
| } |
| } |
| } |
| *crc_value = 0; |
| *file_count = file_paths.size(); |
| if (!is_local()) { |
| return Status::OK(); |
| } |
| |
| // 2. calculate the md5sum of each file |
| const auto& local_fs = io::global_local_filesystem(); |
| DCHECK(!file_paths.empty()); |
| std::vector<std::string> all_file_md5; |
| all_file_md5.reserve(file_paths.size()); |
| for (const auto& file_path : file_paths) { |
| std::string file_md5sum; |
| auto status = local_fs->md5sum(file_path, &file_md5sum); |
| if (!status.ok()) { |
| return status; |
| } |
| VLOG_CRITICAL << fmt::format("calc file_md5sum finished. file_path={}, md5sum={}", |
| file_path.string(), file_md5sum); |
| all_file_md5.emplace_back(std::move(file_md5sum)); |
| } |
| std::sort(all_file_md5.begin(), all_file_md5.end()); |
| |
| // 3. calculate the crc_value based on all_file_md5 |
| DCHECK(file_paths.size() == all_file_md5.size()); |
| for (auto& i : all_file_md5) { |
| *crc_value = crc32c::Extend(*crc_value, (const uint8_t*)i.data(), i.size()); |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status BetaRowset::show_nested_index_file(rapidjson::Value* rowset_value, |
| rapidjson::Document::AllocatorType& allocator) { |
| const auto& fs = _rowset_meta->fs(); |
| auto storage_format = _schema->get_inverted_index_storage_format(); |
| std::string format_str; |
| switch (storage_format) { |
| case InvertedIndexStorageFormatPB::V1: |
| format_str = "V1"; |
| break; |
| case InvertedIndexStorageFormatPB::V2: |
| format_str = "V2"; |
| break; |
| case InvertedIndexStorageFormatPB::V3: |
| format_str = "V3"; |
| break; |
| default: |
| return Status::InternalError("inverted index storage format error"); |
| break; |
| } |
| auto rs_id = rowset_id().to_string(); |
| rowset_value->AddMember("rowset_id", rapidjson::Value(rs_id.c_str(), allocator), allocator); |
| rowset_value->AddMember("index_storage_format", rapidjson::Value(format_str.c_str(), allocator), |
| allocator); |
| rapidjson::Value segments(rapidjson::kArrayType); |
| for (int seg_id = 0; seg_id < num_segments(); ++seg_id) { |
| rapidjson::Value segment(rapidjson::kObjectType); |
| segment.AddMember("segment_id", rapidjson::Value(seg_id).Move(), allocator); |
| |
| auto seg_path = DORIS_TRY(segment_path(seg_id)); |
| auto index_file_path_prefix = InvertedIndexDescriptor::get_index_file_path_prefix(seg_path); |
| auto index_file_reader = std::make_unique<IndexFileReader>( |
| fs, std::string(index_file_path_prefix), storage_format); |
| RETURN_IF_ERROR(index_file_reader->init()); |
| auto dirs = index_file_reader->get_all_directories(); |
| |
| auto add_file_info_to_json = [&](const std::string& path, |
| rapidjson::Value& json_value) -> Status { |
| json_value.AddMember("idx_file_path", rapidjson::Value(path.c_str(), allocator), |
| allocator); |
| int64_t idx_file_size = 0; |
| auto st = fs->file_size(path, &idx_file_size); |
| if (st != Status::OK()) { |
| LOG(WARNING) << "show nested index file get file size error, file: " << path |
| << ", error: " << st.msg(); |
| return st; |
| } |
| json_value.AddMember("idx_file_size", rapidjson::Value(idx_file_size).Move(), |
| allocator); |
| return Status::OK(); |
| }; |
| |
| auto process_files = [&allocator, &index_file_reader](auto& index_meta, |
| rapidjson::Value& indices, |
| rapidjson::Value& index) -> Status { |
| rapidjson::Value files_value(rapidjson::kArrayType); |
| std::vector<std::string> files; |
| auto ret = index_file_reader->open(&index_meta); |
| if (!ret.has_value()) { |
| LOG(INFO) << "IndexFileReader open error:" << ret.error(); |
| return Status::InternalError("IndexFileReader open error"); |
| } |
| using T = std::decay_t<decltype(ret)>; |
| auto reader = std::forward<T>(ret).value(); |
| reader->list(&files); |
| for (auto& file : files) { |
| rapidjson::Value file_value(rapidjson::kObjectType); |
| auto size = reader->fileLength(file.c_str()); |
| file_value.AddMember("name", rapidjson::Value(file.c_str(), allocator), allocator); |
| file_value.AddMember("size", rapidjson::Value(size).Move(), allocator); |
| files_value.PushBack(file_value, allocator); |
| } |
| index.AddMember("files", files_value, allocator); |
| indices.PushBack(index, allocator); |
| return Status::OK(); |
| }; |
| |
| if (storage_format != InvertedIndexStorageFormatPB::V1) { |
| auto path = InvertedIndexDescriptor::get_index_file_path_v2(index_file_path_prefix); |
| auto st = add_file_info_to_json(path, segment); |
| if (!st.ok()) { |
| return st; |
| } |
| rapidjson::Value indices(rapidjson::kArrayType); |
| for (auto& dir : *dirs) { |
| rapidjson::Value index(rapidjson::kObjectType); |
| auto index_id = dir.first.first; |
| auto index_suffix = dir.first.second; |
| index.AddMember("index_id", rapidjson::Value(index_id).Move(), allocator); |
| index.AddMember("index_suffix", rapidjson::Value(index_suffix.c_str(), allocator), |
| allocator); |
| |
| rapidjson::Value files_value(rapidjson::kArrayType); |
| std::vector<std::string> files; |
| doris::TabletIndexPB index_pb; |
| index_pb.set_index_id(index_id); |
| index_pb.set_index_suffix_name(index_suffix); |
| TabletIndex index_meta; |
| index_meta.init_from_pb(index_pb); |
| |
| auto status = process_files(index_meta, indices, index); |
| if (!status.ok()) { |
| return status; |
| } |
| } |
| segment.AddMember("indices", indices, allocator); |
| segments.PushBack(segment, allocator); |
| } else { |
| rapidjson::Value indices(rapidjson::kArrayType); |
| for (auto column : _rowset_meta->tablet_schema()->columns()) { |
| auto index_metas = _rowset_meta->tablet_schema()->inverted_indexs(*column); |
| for (const auto& index_meta : index_metas) { |
| rapidjson::Value index(rapidjson::kObjectType); |
| auto index_id = index_meta->index_id(); |
| auto index_suffix = index_meta->get_index_suffix(); |
| index.AddMember("index_id", rapidjson::Value(index_id).Move(), allocator); |
| index.AddMember("index_suffix", |
| rapidjson::Value(index_suffix.c_str(), allocator), allocator); |
| auto path = InvertedIndexDescriptor::get_index_file_path_v1( |
| index_file_path_prefix, index_id, index_suffix); |
| RETURN_IF_ERROR(add_file_info_to_json(path, index)); |
| RETURN_IF_ERROR(process_files(*index_meta, indices, index)); |
| } |
| } |
| segment.AddMember("indices", indices, allocator); |
| segments.PushBack(segment, allocator); |
| } |
| } |
| rowset_value->AddMember("segments", segments, allocator); |
| return Status::OK(); |
| } |
| #include "common/compile_check_end.h" |
| } // namespace doris |