| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "olap/rowset/beta_rowset_writer.h" |
| |
| #include <assert.h> |
| // IWYU pragma: no_include <bthread/errno.h> |
| #include <errno.h> // IWYU pragma: keep |
| #include <fmt/format.h> |
| #include <stdio.h> |
| |
| #include <ctime> // time |
| #include <filesystem> |
| #include <memory> |
| #include <mutex> |
| #include <sstream> |
| #include <utility> |
| |
| // IWYU pragma: no_include <opentelemetry/common/threadlocal.h> |
| #include "common/compiler_util.h" // IWYU pragma: keep |
| #include "common/config.h" |
| #include "common/logging.h" |
| #include "common/status.h" |
| #include "io/fs/file_reader.h" |
| #include "io/fs/file_system.h" |
| #include "io/fs/file_writer.h" |
| #include "olap/olap_define.h" |
| #include "olap/rowset/beta_rowset.h" |
| #include "olap/rowset/rowset_factory.h" |
| #include "olap/rowset/rowset_writer.h" |
| #include "olap/rowset/segcompaction.h" |
| #include "olap/rowset/segment_v2/inverted_index_cache.h" |
| #include "olap/rowset/segment_v2/inverted_index_desc.h" |
| #include "olap/rowset/segment_v2/segment.h" |
| #include "olap/rowset/segment_v2/segment_writer.h" |
| #include "olap/schema_change.h" |
| #include "olap/storage_engine.h" |
| #include "olap/tablet_schema.h" |
| #include "runtime/thread_context.h" |
| #include "util/debug_points.h" |
| #include "util/pretty_printer.h" |
| #include "util/slice.h" |
| #include "util/stopwatch.hpp" |
| #include "util/time.h" |
| #include "vec/columns/column.h" |
| #include "vec/common/schema_util.h" |
| #include "vec/core/block.h" |
| #include "vec/data_types/data_type_factory.hpp" |
| |
| namespace doris { |
| #include "common/compile_check_begin.h" |
| using namespace ErrorCode; |
| |
| namespace { |
| |
| bool is_segment_overlapping(const std::vector<KeyBoundsPB>& segments_encoded_key_bounds) { |
| std::string_view last; |
| for (auto&& segment_encode_key : segments_encoded_key_bounds) { |
| auto&& cur_min = segment_encode_key.min_key(); |
| auto&& cur_max = segment_encode_key.max_key(); |
| if (cur_min <= last) { |
| return true; |
| } |
| last = cur_max; |
| } |
| return false; |
| } |
| |
| void build_rowset_meta_with_spec_field(RowsetMeta& rowset_meta, |
| const RowsetMeta& spec_rowset_meta) { |
| rowset_meta.set_num_rows(spec_rowset_meta.num_rows()); |
| rowset_meta.set_total_disk_size(spec_rowset_meta.total_disk_size()); |
| rowset_meta.set_data_disk_size(spec_rowset_meta.data_disk_size()); |
| rowset_meta.set_index_disk_size(spec_rowset_meta.index_disk_size()); |
| // TODO write zonemap to meta |
| rowset_meta.set_empty(spec_rowset_meta.num_rows() == 0); |
| rowset_meta.set_creation_time(time(nullptr)); |
| rowset_meta.set_num_segments(spec_rowset_meta.num_segments()); |
| rowset_meta.set_segments_overlap(spec_rowset_meta.segments_overlap()); |
| rowset_meta.set_rowset_state(spec_rowset_meta.rowset_state()); |
| rowset_meta.set_segments_key_bounds_truncated( |
| spec_rowset_meta.is_segments_key_bounds_truncated()); |
| std::vector<KeyBoundsPB> segments_key_bounds; |
| spec_rowset_meta.get_segments_key_bounds(&segments_key_bounds); |
| rowset_meta.set_segments_key_bounds(segments_key_bounds); |
| } |
| |
| } // namespace |
| |
| SegmentFileCollection::~SegmentFileCollection() = default; |
| |
| Status SegmentFileCollection::add(int seg_id, io::FileWriterPtr&& writer) { |
| std::lock_guard lock(_lock); |
| if (_closed) [[unlikely]] { |
| DCHECK(false) << writer->path(); |
| return Status::InternalError("add to closed SegmentFileCollection"); |
| } |
| |
| _file_writers.emplace(seg_id, std::move(writer)); |
| return Status::OK(); |
| } |
| |
| io::FileWriter* SegmentFileCollection::get(int seg_id) const { |
| std::lock_guard lock(_lock); |
| if (auto it = _file_writers.find(seg_id); it != _file_writers.end()) { |
| return it->second.get(); |
| } else { |
| return nullptr; |
| } |
| } |
| |
| Status SegmentFileCollection::close() { |
| { |
| std::lock_guard lock(_lock); |
| if (_closed) [[unlikely]] { |
| DCHECK(false); |
| return Status::InternalError("double close SegmentFileCollection"); |
| } |
| _closed = true; |
| } |
| |
| for (auto&& [_, writer] : _file_writers) { |
| if (writer->state() != io::FileWriter::State::CLOSED) { |
| RETURN_IF_ERROR(writer->close()); |
| } |
| } |
| |
| return Status::OK(); |
| } |
| |
| Result<std::vector<size_t>> SegmentFileCollection::segments_file_size(int seg_id_offset) { |
| std::lock_guard lock(_lock); |
| if (!_closed) [[unlikely]] { |
| DCHECK(false); |
| return ResultError(Status::InternalError("get segments file size without closed")); |
| } |
| |
| Status st; |
| std::vector<size_t> seg_file_size(_file_writers.size(), 0); |
| bool succ = std::all_of(_file_writers.begin(), _file_writers.end(), [&](auto&& it) { |
| auto&& [seg_id, writer] = it; |
| |
| int idx = seg_id - seg_id_offset; |
| if (idx >= seg_file_size.size()) [[unlikely]] { |
| auto err_msg = fmt::format( |
| "invalid seg_id={} num_file_writers={} seg_id_offset={} path={}", seg_id, |
| seg_file_size.size(), seg_id_offset, writer->path().native()); |
| DCHECK(false) << err_msg; |
| st = Status::InternalError(err_msg); |
| return false; |
| } |
| |
| auto& fsize = seg_file_size[idx]; |
| if (fsize != 0) { |
| // File size should not been set |
| auto err_msg = |
| fmt::format("duplicate seg_id={} path={}", seg_id, writer->path().native()); |
| DCHECK(false) << err_msg; |
| st = Status::InternalError(err_msg); |
| return false; |
| } |
| |
| fsize = writer->bytes_appended(); |
| if (fsize <= 0) { |
| auto err_msg = |
| fmt::format("invalid segment fsize={} path={}", fsize, writer->path().native()); |
| DCHECK(false) << err_msg; |
| st = Status::InternalError(err_msg); |
| return false; |
| } |
| |
| return true; |
| }); |
| |
| if (succ) { |
| return seg_file_size; |
| } |
| |
| return ResultError(st); |
| } |
| |
| InvertedIndexFileCollection::~InvertedIndexFileCollection() = default; |
| |
| Status InvertedIndexFileCollection::add(int seg_id, IndexFileWriterPtr&& index_writer) { |
| std::lock_guard lock(_lock); |
| if (_inverted_index_file_writers.find(seg_id) != _inverted_index_file_writers.end()) |
| [[unlikely]] { |
| DCHECK(false); |
| return Status::InternalError("The seg_id already exists, seg_id is {}", seg_id); |
| } |
| _inverted_index_file_writers.emplace(seg_id, std::move(index_writer)); |
| return Status::OK(); |
| } |
| |
| Status InvertedIndexFileCollection::begin_close() { |
| std::lock_guard lock(_lock); |
| for (auto&& [id, writer] : _inverted_index_file_writers) { |
| RETURN_IF_ERROR(writer->begin_close()); |
| _total_size += writer->get_index_file_total_size(); |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status InvertedIndexFileCollection::finish_close() { |
| std::lock_guard lock(_lock); |
| for (auto&& [id, writer] : _inverted_index_file_writers) { |
| RETURN_IF_ERROR(writer->finish_close()); |
| } |
| return Status::OK(); |
| } |
| |
| Result<std::vector<const InvertedIndexFileInfo*>> |
| InvertedIndexFileCollection::inverted_index_file_info(int seg_id_offset) { |
| std::lock_guard lock(_lock); |
| |
| Status st; |
| std::vector<const InvertedIndexFileInfo*> idx_file_info(_inverted_index_file_writers.size()); |
| bool succ = std::all_of( |
| _inverted_index_file_writers.begin(), _inverted_index_file_writers.end(), |
| [&](auto&& it) { |
| auto&& [seg_id, writer] = it; |
| |
| int idx = seg_id - seg_id_offset; |
| if (idx >= idx_file_info.size()) [[unlikely]] { |
| auto err_msg = |
| fmt::format("invalid seg_id={} num_file_writers={} seg_id_offset={}", |
| seg_id, idx_file_info.size(), seg_id_offset); |
| DCHECK(false) << err_msg; |
| st = Status::InternalError(err_msg); |
| return false; |
| } |
| idx_file_info[idx] = _inverted_index_file_writers[seg_id]->get_index_file_info(); |
| return true; |
| }); |
| |
| if (succ) { |
| return idx_file_info; |
| } |
| |
| return ResultError(st); |
| } |
| |
| BaseBetaRowsetWriter::BaseBetaRowsetWriter() |
| : _num_segment(0), |
| _segment_start_id(0), |
| _num_rows_written(0), |
| _total_data_size(0), |
| _total_index_size(0), |
| _segment_creator(_context, _seg_files, _idx_files) {} |
| |
| BetaRowsetWriter::BetaRowsetWriter(StorageEngine& engine) |
| : _engine(engine), _segcompaction_worker(std::make_shared<SegcompactionWorker>(this)) {} |
| |
| BaseBetaRowsetWriter::~BaseBetaRowsetWriter() { |
| if (!_already_built && _rowset_meta->is_local()) { |
| // abnormal exit, remove all files generated |
| auto& fs = io::global_local_filesystem(); |
| for (int i = _segment_start_id; i < _segment_creator.next_segment_id(); ++i) { |
| std::string seg_path = |
| local_segment_path(_context.tablet_path, _context.rowset_id.to_string(), i); |
| // Even if an error is encountered, these files that have not been cleaned up |
| // will be cleaned up by the GC background. So here we only print the error |
| // message when we encounter an error. |
| WARN_IF_ERROR(fs->delete_file(seg_path), |
| fmt::format("Failed to delete file={}", seg_path)); |
| } |
| } |
| } |
| |
| BetaRowsetWriter::~BetaRowsetWriter() { |
| /* Note that segcompaction is async and in parallel with load job. So we should handle carefully |
| * when the job is cancelled. Although it is meaningless to continue segcompaction when the job |
| * is cancelled, the objects involved in the job should be preserved during segcompaction to |
| * avoid crashs for memory issues. */ |
| WARN_IF_ERROR(_wait_flying_segcompaction(), "segment compaction failed"); |
| |
| if (_calc_delete_bitmap_token != nullptr) { |
| _calc_delete_bitmap_token->cancel(); |
| } |
| } |
| |
| Status BaseBetaRowsetWriter::init(const RowsetWriterContext& rowset_writer_context) { |
| _context = rowset_writer_context; |
| _rowset_meta.reset(new RowsetMeta); |
| if (_context.storage_resource) { |
| _rowset_meta->set_remote_storage_resource(*_context.storage_resource); |
| } |
| _rowset_meta->set_rowset_id(_context.rowset_id); |
| _rowset_meta->set_partition_id(_context.partition_id); |
| _rowset_meta->set_tablet_id(_context.tablet_id); |
| _rowset_meta->set_tablet_schema_hash(_context.tablet_schema_hash); |
| _rowset_meta->set_rowset_type(_context.rowset_type); |
| _rowset_meta->set_rowset_state(_context.rowset_state); |
| _rowset_meta->set_segments_overlap(_context.segments_overlap); |
| if (_context.rowset_state == PREPARED || _context.rowset_state == COMMITTED) { |
| _is_pending = true; |
| _rowset_meta->set_txn_id(_context.txn_id); |
| _rowset_meta->set_load_id(_context.load_id); |
| } else { |
| _rowset_meta->set_version(_context.version); |
| _rowset_meta->set_newest_write_timestamp(_context.newest_write_timestamp); |
| } |
| _rowset_meta->set_tablet_uid(_context.tablet_uid); |
| _rowset_meta->set_tablet_schema(_context.tablet_schema); |
| _context.segment_collector = std::make_shared<SegmentCollectorT<BaseBetaRowsetWriter>>(this); |
| _context.file_writer_creator = std::make_shared<FileWriterCreatorT<BaseBetaRowsetWriter>>(this); |
| return Status::OK(); |
| } |
| |
| Status BaseBetaRowsetWriter::add_block(const vectorized::Block* block) { |
| return _segment_creator.add_block(block); |
| } |
| |
| Status BaseBetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) { |
| SCOPED_RAW_TIMER(&_delete_bitmap_ns); |
| if (!_context.tablet->enable_unique_key_merge_on_write() || |
| (_context.partial_update_info && _context.partial_update_info->is_partial_update())) { |
| return Status::OK(); |
| } |
| std::vector<RowsetSharedPtr> specified_rowsets; |
| { |
| std::shared_lock meta_rlock(_context.tablet->get_header_lock()); |
| specified_rowsets = |
| _context.tablet->get_rowset_by_ids(_context.mow_context->rowset_ids.get()); |
| } |
| |
| // Submit the entire delete bitmap calculation process to thread pool for async execution |
| // This avoids blocking memtable flush thread while waiting for file upload to complete |
| // The process includes: file_writer->close(), _build_tmp, load_segments, and calc_delete_bitmap |
| return _calc_delete_bitmap_token->submit_func( |
| [this, segment_id, specified_rowsets = std::move(specified_rowsets)]() -> Status { |
| Status st = Status::OK(); |
| // Step 1: Close file_writer (must be done before load_segments) |
| auto* file_writer = _seg_files.get(segment_id); |
| if (file_writer && file_writer->state() != io::FileWriter::State::CLOSED) { |
| MonotonicStopWatch close_timer; |
| close_timer.start(); |
| st = file_writer->close(); |
| close_timer.stop(); |
| |
| auto close_time_ms = close_timer.elapsed_time_milliseconds(); |
| if (close_time_ms > 1000) { |
| LOG(INFO) << "file_writer->close() took " << close_time_ms |
| << "ms for segment_id=" << segment_id |
| << ", tablet_id=" << _context.tablet_id |
| << ", rowset_id=" << _context.rowset_id; |
| } |
| if (!st.ok()) { |
| return st; |
| } |
| } |
| |
| OlapStopWatch watch; |
| // Step 2: Build tmp rowset (needs file_writer to be closed) |
| RowsetSharedPtr rowset_ptr; |
| st = _build_tmp(rowset_ptr); |
| if (!st.ok()) { |
| return st; |
| } |
| |
| // Step 3: Load segments (needs file_writer to be closed and rowset to be built) |
| auto* beta_rowset = reinterpret_cast<BetaRowset*>(rowset_ptr.get()); |
| std::vector<segment_v2::SegmentSharedPtr> segments; |
| st = beta_rowset->load_segments(segment_id, segment_id + 1, &segments); |
| if (!st.ok()) { |
| return st; |
| } |
| |
| // Step 4: Calculate delete bitmap |
| st = BaseTablet::calc_delete_bitmap( |
| _context.tablet, rowset_ptr, segments, specified_rowsets, |
| _context.mow_context->delete_bitmap, _context.mow_context->max_version, |
| nullptr, nullptr, nullptr); |
| if (!st.ok()) { |
| return st; |
| } |
| |
| size_t total_rows = |
| std::accumulate(segments.begin(), segments.end(), 0, |
| [](size_t sum, const segment_v2::SegmentSharedPtr& s) { |
| return sum += s->num_rows(); |
| }); |
| LOG(INFO) << "[Memtable Flush] construct delete bitmap tablet: " |
| << _context.tablet->tablet_id() |
| << ", rowset_ids: " << _context.mow_context->rowset_ids->size() |
| << ", cur max_version: " << _context.mow_context->max_version |
| << ", transaction_id: " << _context.mow_context->txn_id |
| << ", delete_bitmap_count: " |
| << _context.mow_context->delete_bitmap->get_delete_bitmap_count() |
| << ", delete_bitmap_cardinality: " |
| << _context.mow_context->delete_bitmap->cardinality() |
| << ", cost: " << watch.get_elapse_time_us() |
| << "(us), total rows: " << total_rows; |
| return Status::OK(); |
| }); |
| } |
| |
| Status BetaRowsetWriter::init(const RowsetWriterContext& rowset_writer_context) { |
| RETURN_IF_ERROR(BaseBetaRowsetWriter::init(rowset_writer_context)); |
| if (_segcompaction_worker) { |
| _segcompaction_worker->init_mem_tracker(rowset_writer_context); |
| } |
| if (_context.mow_context != nullptr) { |
| _calc_delete_bitmap_token = _engine.calc_delete_bitmap_executor_for_load()->create_token(); |
| } |
| return Status::OK(); |
| } |
| |
| Status BetaRowsetWriter::_load_noncompacted_segment(segment_v2::SegmentSharedPtr& segment, |
| int32_t segment_id) { |
| DCHECK(_rowset_meta->is_local()); |
| auto fs = _rowset_meta->fs(); |
| if (!fs) { |
| return Status::Error<INIT_FAILED>( |
| "BetaRowsetWriter::_load_noncompacted_segment _rowset_meta->fs get failed"); |
| } |
| auto path = |
| local_segment_path(_context.tablet_path, _context.rowset_id.to_string(), segment_id); |
| io::FileReaderOptions reader_options { |
| .cache_type = |
| _context.write_file_cache |
| ? (config::enable_file_cache ? io::FileCachePolicy::FILE_BLOCK_CACHE |
| : io::FileCachePolicy::NO_CACHE) |
| : io::FileCachePolicy::NO_CACHE, |
| .is_doris_table = true, |
| .cache_base_path {}, |
| }; |
| auto s = segment_v2::Segment::open(fs, path, _rowset_meta->tablet_id(), segment_id, rowset_id(), |
| _context.tablet_schema, reader_options, &segment); |
| if (!s.ok()) { |
| LOG(WARNING) << "failed to open segment. " << path << ":" << s; |
| return s; |
| } |
| return Status::OK(); |
| } |
| |
| /* policy of segcompaction target selection: |
| * 1. skip big segments |
| * 2. if the consecutive smalls end up with a big, compact the smalls, except |
| * single small |
| * 3. if the consecutive smalls end up with small, compact the smalls if the |
| * length is beyond (config::segcompaction_batch_size / 2) |
| */ |
| Status BetaRowsetWriter::_find_longest_consecutive_small_segment( |
| SegCompactionCandidatesSharedPtr& segments) { |
| segments = std::make_shared<SegCompactionCandidates>(); |
| // skip last (maybe active) segment |
| int32_t last_segment = _num_segment - 1; |
| size_t task_bytes = 0; |
| uint32_t task_rows = 0; |
| int32_t segid; |
| for (segid = _segcompacted_point; |
| segid < last_segment && segments->size() < config::segcompaction_batch_size; segid++) { |
| segment_v2::SegmentSharedPtr segment; |
| RETURN_IF_ERROR(_load_noncompacted_segment(segment, segid)); |
| const auto segment_rows = segment->num_rows(); |
| const auto segment_bytes = segment->file_reader()->size(); |
| bool is_large_segment = segment_rows > config::segcompaction_candidate_max_rows || |
| segment_bytes > config::segcompaction_candidate_max_bytes; |
| if (is_large_segment) { |
| if (segid == _segcompacted_point) { |
| // skip large segments at the front |
| auto dst_seg_id = _num_segcompacted.load(); |
| RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++)); |
| if (_segcompaction_worker->need_convert_delete_bitmap()) { |
| _segcompaction_worker->convert_segment_delete_bitmap( |
| _context.mow_context->delete_bitmap, segid, dst_seg_id); |
| } |
| continue; |
| } else { |
| // stop because we need consecutive segments |
| break; |
| } |
| } |
| bool is_task_full = task_rows + segment_rows > config::segcompaction_task_max_rows || |
| task_bytes + segment_bytes > config::segcompaction_task_max_bytes; |
| if (is_task_full) { |
| break; |
| } |
| segments->push_back(segment); |
| task_rows += segment->num_rows(); |
| task_bytes += segment->file_reader()->size(); |
| } |
| size_t s = segments->size(); |
| if (segid == last_segment && s <= (config::segcompaction_batch_size / 2)) { |
| // we didn't collect enough segments, better to do it in next |
| // round to compact more at once |
| segments->clear(); |
| return Status::OK(); |
| } |
| if (s == 1) { // poor bachelor, let it go |
| VLOG_DEBUG << "only one candidate segment"; |
| auto src_seg_id = _segcompacted_point.load(); |
| auto dst_seg_id = _num_segcompacted.load(); |
| RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++)); |
| if (_segcompaction_worker->need_convert_delete_bitmap()) { |
| _segcompaction_worker->convert_segment_delete_bitmap( |
| _context.mow_context->delete_bitmap, src_seg_id, dst_seg_id); |
| } |
| segments->clear(); |
| return Status::OK(); |
| } |
| if (VLOG_DEBUG_IS_ON) { |
| vlog_buffer.clear(); |
| for (auto& segment : (*segments.get())) { |
| fmt::format_to(vlog_buffer, "[id:{} num_rows:{}]", segment->id(), segment->num_rows()); |
| } |
| VLOG_DEBUG << "candidate segments num:" << s |
| << " list of candidates:" << fmt::to_string(vlog_buffer); |
| } |
| return Status::OK(); |
| } |
| |
| Status BetaRowsetWriter::_rename_compacted_segments(int64_t begin, int64_t end) { |
| int ret; |
| auto src_seg_path = BetaRowset::local_segment_path_segcompacted(_context.tablet_path, |
| _context.rowset_id, begin, end); |
| auto dst_seg_path = local_segment_path(_context.tablet_path, _context.rowset_id.to_string(), |
| _num_segcompacted); |
| ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); |
| if (ret) { |
| return Status::Error<ROWSET_RENAME_FILE_FAILED>( |
| "failed to rename {} to {}. ret:{}, errno:{}", src_seg_path, dst_seg_path, ret, |
| errno); |
| } |
| RETURN_IF_ERROR(_remove_segment_footer_cache(_num_segcompacted, dst_seg_path)); |
| |
| // rename inverted index files |
| RETURN_IF_ERROR(_rename_compacted_indices(begin, end, 0)); |
| |
| _num_segcompacted++; |
| return Status::OK(); |
| } |
| |
| void BetaRowsetWriter::_clear_statistics_for_deleting_segments_unsafe(uint32_t begin, |
| uint32_t end) { |
| VLOG_DEBUG << "_segid_statistics_map clear record segid range from:" << begin << " to:" << end; |
| for (uint32_t i = begin; i <= end; ++i) { |
| _segid_statistics_map.erase(i); |
| } |
| } |
| |
| Status BetaRowsetWriter::_rename_compacted_segment_plain(uint32_t seg_id) { |
| if (seg_id == _num_segcompacted) { |
| ++_num_segcompacted; |
| return Status::OK(); |
| } |
| |
| auto src_seg_path = |
| local_segment_path(_context.tablet_path, _context.rowset_id.to_string(), seg_id); |
| auto dst_seg_path = local_segment_path(_context.tablet_path, _context.rowset_id.to_string(), |
| _num_segcompacted); |
| VLOG_DEBUG << "segcompaction skip this segment. rename " << src_seg_path << " to " |
| << dst_seg_path; |
| { |
| std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex); |
| DCHECK_EQ(_segid_statistics_map.find(seg_id) == _segid_statistics_map.end(), false); |
| DCHECK_EQ(_segid_statistics_map.find(_num_segcompacted) == _segid_statistics_map.end(), |
| true); |
| auto org = _segid_statistics_map[seg_id]; |
| _segid_statistics_map.emplace(_num_segcompacted, org); |
| _clear_statistics_for_deleting_segments_unsafe(seg_id, seg_id); |
| } |
| int ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); |
| if (ret) { |
| return Status::Error<ROWSET_RENAME_FILE_FAILED>( |
| "failed to rename {} to {}. ret:{}, errno:{}", src_seg_path, dst_seg_path, ret, |
| errno); |
| } |
| |
| RETURN_IF_ERROR(_remove_segment_footer_cache(_num_segcompacted, dst_seg_path)); |
| // rename remaining inverted index files |
| RETURN_IF_ERROR(_rename_compacted_indices(-1, -1, seg_id)); |
| |
| ++_num_segcompacted; |
| return Status::OK(); |
| } |
| |
| Status BetaRowsetWriter::_remove_segment_footer_cache(const uint32_t seg_id, |
| const std::string& segment_path) { |
| auto* footer_page_cache = ExecEnv::GetInstance()->get_storage_page_cache(); |
| if (!footer_page_cache) { |
| return Status::OK(); |
| } |
| |
| auto fs = _rowset_meta->fs(); |
| bool exists = false; |
| RETURN_IF_ERROR(fs->exists(segment_path, &exists)); |
| if (exists) { |
| io::FileReaderSPtr file_reader; |
| io::FileReaderOptions reader_options { |
| .cache_type = config::enable_file_cache ? io::FileCachePolicy::FILE_BLOCK_CACHE |
| : io::FileCachePolicy::NO_CACHE, |
| .is_doris_table = true, |
| .cache_base_path = "", |
| .file_size = _rowset_meta->segment_file_size(static_cast<int>(seg_id)), |
| .tablet_id = _rowset_meta->tablet_id(), |
| }; |
| RETURN_IF_ERROR(fs->open_file(segment_path, &file_reader, &reader_options)); |
| DCHECK(file_reader != nullptr); |
| auto cache_key = segment_v2::Segment::get_segment_footer_cache_key(file_reader); |
| footer_page_cache->erase(cache_key, segment_v2::PageTypePB::INDEX_PAGE); |
| } |
| return Status::OK(); |
| } |
| |
| Status BetaRowsetWriter::_rename_compacted_indices(int64_t begin, int64_t end, uint64_t seg_id) { |
| int ret; |
| |
| auto src_seg_path = begin < 0 ? local_segment_path(_context.tablet_path, |
| _context.rowset_id.to_string(), seg_id) |
| : BetaRowset::local_segment_path_segcompacted( |
| _context.tablet_path, _context.rowset_id, begin, end); |
| auto src_index_path_prefix = InvertedIndexDescriptor::get_index_file_path_prefix(src_seg_path); |
| auto dst_seg_path = local_segment_path(_context.tablet_path, _context.rowset_id.to_string(), |
| _num_segcompacted); |
| auto dst_index_path_prefix = InvertedIndexDescriptor::get_index_file_path_prefix(dst_seg_path); |
| |
| if (_context.tablet_schema->get_inverted_index_storage_format() >= |
| InvertedIndexStorageFormatPB::V2) { |
| if (_context.tablet_schema->has_inverted_index() || |
| _context.tablet_schema->has_ann_index()) { |
| auto src_idx_path = |
| InvertedIndexDescriptor::get_index_file_path_v2(src_index_path_prefix); |
| auto dst_idx_path = |
| InvertedIndexDescriptor::get_index_file_path_v2(dst_index_path_prefix); |
| |
| ret = rename(src_idx_path.c_str(), dst_idx_path.c_str()); |
| if (ret) { |
| return Status::Error<ROWSET_RENAME_FILE_FAILED>( |
| "failed to rename {} to {}. ret:{}, errno:{}", src_idx_path, dst_idx_path, |
| ret, errno); |
| } |
| } |
| } |
| // rename remaining inverted index files |
| for (auto column : _context.tablet_schema->columns()) { |
| auto index_infos = _context.tablet_schema->inverted_indexs(*column); |
| for (const auto& index_info : index_infos) { |
| auto index_id = index_info->index_id(); |
| if (_context.tablet_schema->get_inverted_index_storage_format() == |
| InvertedIndexStorageFormatPB::V1) { |
| auto src_idx_path = InvertedIndexDescriptor::get_index_file_path_v1( |
| src_index_path_prefix, index_id, index_info->get_index_suffix()); |
| auto dst_idx_path = InvertedIndexDescriptor::get_index_file_path_v1( |
| dst_index_path_prefix, index_id, index_info->get_index_suffix()); |
| VLOG_DEBUG << "segcompaction skip this index. rename " << src_idx_path << " to " |
| << dst_idx_path; |
| ret = rename(src_idx_path.c_str(), dst_idx_path.c_str()); |
| if (ret) { |
| return Status::Error<INVERTED_INDEX_RENAME_FILE_FAILED>( |
| "failed to rename {} to {}. ret:{}, errno:{}", src_idx_path, |
| dst_idx_path, ret, errno); |
| } |
| } |
| // Erase the origin index file cache |
| auto src_idx_cache_key = InvertedIndexDescriptor::get_index_file_cache_key( |
| src_index_path_prefix, index_id, index_info->get_index_suffix()); |
| auto dst_idx_cache_key = InvertedIndexDescriptor::get_index_file_cache_key( |
| dst_index_path_prefix, index_id, index_info->get_index_suffix()); |
| RETURN_IF_ERROR(InvertedIndexSearcherCache::instance()->erase(src_idx_cache_key)); |
| RETURN_IF_ERROR(InvertedIndexSearcherCache::instance()->erase(dst_idx_cache_key)); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status BetaRowsetWriter::_segcompaction_if_necessary() { |
| Status status = Status::OK(); |
| // if not doing segcompaction, just check segment number |
| if (!config::enable_segcompaction || !_context.enable_segcompaction || |
| _context.tablet_schema->num_variant_columns() > 0) { |
| return _check_segment_number_limit(_num_segment); |
| } |
| // leave _is_doing_segcompaction as the last condition |
| // otherwise _segcompacting_cond will never get notified |
| if (_is_doing_segcompaction.exchange(true)) { |
| return status; |
| } |
| if (_segcompaction_status.load() != OK) { |
| status = Status::Error<SEGCOMPACTION_FAILED>( |
| "BetaRowsetWriter::_segcompaction_if_necessary meet invalid state, error code: {}", |
| _segcompaction_status.load()); |
| } else { |
| status = _check_segment_number_limit(_num_segcompacted); |
| } |
| if (status.ok() && (_num_segment - _segcompacted_point) >= config::segcompaction_batch_size) { |
| SegCompactionCandidatesSharedPtr segments; |
| status = _find_longest_consecutive_small_segment(segments); |
| if (LIKELY(status.ok()) && (!segments->empty())) { |
| LOG(INFO) << "submit segcompaction task, tablet_id:" << _context.tablet_id |
| << " rowset_id:" << _context.rowset_id << " segment num:" << _num_segment |
| << ", segcompacted_point:" << _segcompacted_point; |
| status = _engine.submit_seg_compaction_task(_segcompaction_worker, segments); |
| if (status.ok()) { |
| return status; |
| } |
| } |
| } |
| { |
| std::lock_guard lk(_is_doing_segcompaction_lock); |
| _is_doing_segcompaction = false; |
| _segcompacting_cond.notify_all(); |
| } |
| return status; |
| } |
| |
| Status BetaRowsetWriter::_segcompaction_rename_last_segments() { |
| DCHECK_EQ(_is_doing_segcompaction, false); |
| if (!config::enable_segcompaction) { |
| return Status::OK(); |
| } |
| if (_segcompaction_status.load() != OK) { |
| return Status::Error<SEGCOMPACTION_FAILED>( |
| "BetaRowsetWriter::_segcompaction_rename_last_segments meet invalid state, error " |
| "code: {}", |
| _segcompaction_status.load()); |
| } |
| if (!is_segcompacted() || _segcompacted_point == _num_segment) { |
| // no need if never segcompact before or all segcompacted |
| return Status::OK(); |
| } |
| // currently we only rename remaining segments to reduce wait time |
| // so that transaction can be committed ASAP |
| VLOG_DEBUG << "segcompaction last few segments"; |
| for (int32_t segid = _segcompacted_point; segid < _num_segment; segid++) { |
| auto dst_segid = _num_segcompacted.load(); |
| RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++)); |
| if (_segcompaction_worker->need_convert_delete_bitmap()) { |
| _segcompaction_worker->convert_segment_delete_bitmap( |
| _context.mow_context->delete_bitmap, segid, dst_segid); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status BaseBetaRowsetWriter::add_rowset(RowsetSharedPtr rowset) { |
| assert(rowset->rowset_meta()->rowset_type() == BETA_ROWSET); |
| RETURN_IF_ERROR(rowset->link_files_to(_context.tablet_path, _context.rowset_id)); |
| _num_rows_written += rowset->num_rows(); |
| const auto& rowset_meta = rowset->rowset_meta(); |
| auto index_size = rowset_meta->index_disk_size(); |
| auto total_size = rowset_meta->total_disk_size(); |
| auto data_size = rowset_meta->data_disk_size(); |
| // corrupted index size caused by bug before 2.1.5 or 3.0.0 version |
| // try to get real index size from disk. |
| if (index_size < 0 || index_size > total_size * 2) { |
| LOG(ERROR) << "invalid index size:" << index_size << " total size:" << total_size |
| << " data size:" << data_size << " tablet:" << rowset_meta->tablet_id() |
| << " rowset:" << rowset_meta->rowset_id(); |
| index_size = 0; |
| auto st = rowset->get_inverted_index_size(&index_size); |
| if (!st.ok()) { |
| if (!st.is<NOT_FOUND>()) { |
| LOG(ERROR) << "failed to get inverted index size. res=" << st; |
| return st; |
| } |
| } |
| } |
| _total_data_size += data_size; |
| _total_index_size += index_size; |
| _num_segment += cast_set<int32_t>(rowset->num_segments()); |
| // append key_bounds to current rowset |
| RETURN_IF_ERROR(rowset->get_segments_key_bounds(&_segments_encoded_key_bounds)); |
| _segments_key_bounds_truncated = rowset->rowset_meta()->is_segments_key_bounds_truncated(); |
| |
| // TODO update zonemap |
| if (rowset->rowset_meta()->has_delete_predicate()) { |
| _rowset_meta->set_delete_predicate(rowset->rowset_meta()->delete_predicate()); |
| } |
| // Update the tablet schema in the rowset metadata if the tablet schema contains a variant. |
| // During the build process, _context.tablet_schema will be used as the rowset schema. |
| // This situation may arise in the event of a linked schema change. If this schema is not set, |
| // the subcolumns of the variant will be lost. |
| if (_context.tablet_schema->num_variant_columns() > 0 && rowset->tablet_schema() != nullptr) { |
| _context.tablet_schema = rowset->tablet_schema(); |
| } |
| return Status::OK(); |
| } |
| |
| Status BaseBetaRowsetWriter::add_rowset_for_linked_schema_change(RowsetSharedPtr rowset) { |
| // TODO use schema_mapping to transfer zonemap |
| return add_rowset(rowset); |
| } |
| |
| Status BaseBetaRowsetWriter::flush() { |
| return _segment_creator.flush(); |
| } |
| |
| Status BaseBetaRowsetWriter::flush_memtable(vectorized::Block* block, int32_t segment_id, |
| int64_t* flush_size) { |
| if (block->rows() == 0) { |
| return Status::OK(); |
| } |
| |
| { |
| SCOPED_RAW_TIMER(&_segment_writer_ns); |
| RETURN_IF_ERROR(_segment_creator.flush_single_block(block, segment_id, flush_size)); |
| } |
| return Status::OK(); |
| } |
| |
| Status BaseBetaRowsetWriter::flush_single_block(const vectorized::Block* block) { |
| return _segment_creator.flush_single_block(block); |
| } |
| |
| Status BetaRowsetWriter::_wait_flying_segcompaction() { |
| std::unique_lock<std::mutex> l(_is_doing_segcompaction_lock); |
| uint64_t begin_wait = GetCurrentTimeMicros(); |
| while (_is_doing_segcompaction) { |
| // change sync wait to async? |
| _segcompacting_cond.wait(l); |
| } |
| uint64_t elapsed = GetCurrentTimeMicros() - begin_wait; |
| if (elapsed >= MICROS_PER_SEC) { |
| LOG(INFO) << "wait flying segcompaction finish time:" << elapsed << "us"; |
| } |
| if (_segcompaction_status.load() != OK) { |
| return Status::Error<SEGCOMPACTION_FAILED>( |
| "BetaRowsetWriter meet invalid state, error code: {}", |
| _segcompaction_status.load()); |
| } |
| return Status::OK(); |
| } |
| |
| RowsetSharedPtr BaseBetaRowsetWriter::manual_build(const RowsetMetaSharedPtr& spec_rowset_meta) { |
| if (_rowset_meta->newest_write_timestamp() == -1) { |
| _rowset_meta->set_newest_write_timestamp(UnixSeconds()); |
| } |
| |
| build_rowset_meta_with_spec_field(*_rowset_meta, *spec_rowset_meta); |
| RowsetSharedPtr rowset; |
| auto status = RowsetFactory::create_rowset(_context.tablet_schema, _context.tablet_path, |
| _rowset_meta, &rowset); |
| if (!status.ok()) { |
| LOG(WARNING) << "rowset init failed when build new rowset, res=" << status; |
| return nullptr; |
| } |
| _already_built = true; |
| return rowset; |
| } |
| |
| Status BaseBetaRowsetWriter::_close_file_writers() { |
| // Flush and close segment files |
| RETURN_NOT_OK_STATUS_WITH_WARN(_segment_creator.close(), |
| "failed to close segment creator when build new rowset"); |
| return Status::OK(); |
| } |
| |
| Status BetaRowsetWriter::_close_file_writers() { |
| RETURN_IF_ERROR(BaseBetaRowsetWriter::_close_file_writers()); |
| // if _segment_start_id is not zero, that means it's a transient rowset writer for |
| // MoW partial update, don't need to do segment compaction. |
| if (_segment_start_id == 0) { |
| if (_segcompaction_worker->cancel()) { |
| std::lock_guard lk(_is_doing_segcompaction_lock); |
| _is_doing_segcompaction = false; |
| _segcompacting_cond.notify_all(); |
| } else { |
| RETURN_NOT_OK_STATUS_WITH_WARN(_wait_flying_segcompaction(), |
| "segcompaction failed when build new rowset"); |
| } |
| RETURN_NOT_OK_STATUS_WITH_WARN(_segcompaction_rename_last_segments(), |
| "rename last segments failed when build new rowset"); |
| // segcompaction worker would do file wrier's close function in compact_segments |
| if (auto& seg_comp_file_writer = _segcompaction_worker->get_file_writer(); |
| nullptr != seg_comp_file_writer && |
| seg_comp_file_writer->state() != io::FileWriter::State::CLOSED) { |
| RETURN_NOT_OK_STATUS_WITH_WARN(seg_comp_file_writer->close(), |
| "close segment compaction worker failed"); |
| } |
| // process delete bitmap for mow table |
| if (is_segcompacted() && _segcompaction_worker->need_convert_delete_bitmap()) { |
| auto converted_delete_bitmap = _segcompaction_worker->get_converted_delete_bitmap(); |
| // which means the segment compaction is triggerd |
| if (converted_delete_bitmap != nullptr) { |
| RowsetIdUnorderedSet rowsetids; |
| rowsetids.insert(rowset_id()); |
| context().tablet->add_sentinel_mark_to_delete_bitmap(converted_delete_bitmap.get(), |
| rowsetids); |
| context().mow_context->delete_bitmap->remove({rowset_id(), 0, 0}, |
| {rowset_id(), UINT32_MAX, INT64_MAX}); |
| context().mow_context->delete_bitmap->merge(*converted_delete_bitmap); |
| } |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status BetaRowsetWriter::build(RowsetSharedPtr& rowset) { |
| if (_calc_delete_bitmap_token != nullptr) { |
| RETURN_IF_ERROR(_calc_delete_bitmap_token->wait()); |
| } |
| RETURN_IF_ERROR(_close_file_writers()); |
| const auto total_segment_num = _num_segment - _segcompacted_point + 1 + _num_segcompacted; |
| RETURN_NOT_OK_STATUS_WITH_WARN(_check_segment_number_limit(total_segment_num), |
| "too many segments when build new rowset"); |
| RETURN_IF_ERROR(_build_rowset_meta(_rowset_meta.get(), true)); |
| if (_is_pending) { |
| _rowset_meta->set_rowset_state(COMMITTED); |
| } else { |
| _rowset_meta->set_rowset_state(VISIBLE); |
| } |
| |
| if (_rowset_meta->newest_write_timestamp() == -1) { |
| _rowset_meta->set_newest_write_timestamp(UnixSeconds()); |
| } |
| |
| _rowset_meta->set_tablet_schema(_context.tablet_schema); |
| |
| // If segment compaction occurs, the idx file info will become inaccurate. |
| if ((_context.tablet_schema->has_inverted_index() || _context.tablet_schema->has_ann_index()) && |
| _num_segcompacted == 0) { |
| if (auto idx_files_info = _idx_files.inverted_index_file_info(_segment_start_id); |
| !idx_files_info.has_value()) [[unlikely]] { |
| LOG(ERROR) << "expected inverted index files info, but none presents: " |
| << idx_files_info.error(); |
| } else { |
| _rowset_meta->add_inverted_index_files_info(idx_files_info.value()); |
| } |
| } |
| |
| RETURN_NOT_OK_STATUS_WITH_WARN( |
| RowsetFactory::create_rowset(_context.tablet_schema, _context.tablet_path, _rowset_meta, |
| &rowset), |
| "rowset init failed when build new rowset"); |
| _already_built = true; |
| return Status::OK(); |
| } |
| |
| int64_t BaseBetaRowsetWriter::_num_seg() const { |
| return _num_segment; |
| } |
| |
| int64_t BetaRowsetWriter::_num_seg() const { |
| return is_segcompacted() ? _num_segcompacted : _num_segment; |
| } |
| |
| Status BaseBetaRowsetWriter::_build_rowset_meta(RowsetMeta* rowset_meta, bool check_segment_num) { |
| int64_t num_rows_written = 0; |
| int64_t total_data_size = 0; |
| int64_t total_index_size = 0; |
| std::vector<KeyBoundsPB> segments_encoded_key_bounds; |
| { |
| std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex); |
| for (const auto& itr : _segid_statistics_map) { |
| num_rows_written += itr.second.row_num; |
| total_data_size += itr.second.data_size; |
| total_index_size += itr.second.index_size; |
| segments_encoded_key_bounds.push_back(itr.second.key_bounds); |
| } |
| } |
| for (auto& key_bound : _segments_encoded_key_bounds) { |
| segments_encoded_key_bounds.push_back(key_bound); |
| } |
| if (_segments_key_bounds_truncated.has_value()) { |
| rowset_meta->set_segments_key_bounds_truncated(_segments_key_bounds_truncated.value()); |
| } |
| // segment key bounds are empty in old version(before version 1.2.x). So we should not modify |
| // the overlap property when key bounds are empty. |
| // for mow table with cluster keys, the overlap is used for cluster keys, |
| // the key_bounds is primary keys |
| if (!segments_encoded_key_bounds.empty() && |
| !is_segment_overlapping(segments_encoded_key_bounds) && |
| _context.tablet_schema->cluster_key_uids().empty()) { |
| rowset_meta->set_segments_overlap(NONOVERLAPPING); |
| } |
| |
| auto segment_num = _num_seg(); |
| if (check_segment_num && config::check_segment_when_build_rowset_meta) { |
| auto segments_encoded_key_bounds_size = segments_encoded_key_bounds.size(); |
| if (segments_encoded_key_bounds_size != segment_num) { |
| return Status::InternalError( |
| "segments_encoded_key_bounds_size should equal to _num_seg, " |
| "segments_encoded_key_bounds_size " |
| "is: {}, _num_seg is: {}", |
| segments_encoded_key_bounds_size, segment_num); |
| } |
| } |
| |
| rowset_meta->set_num_segments(segment_num); |
| rowset_meta->set_num_rows(num_rows_written + _num_rows_written); |
| rowset_meta->set_total_disk_size(total_data_size + _total_data_size + total_index_size + |
| _total_index_size); |
| rowset_meta->set_data_disk_size(total_data_size + _total_data_size); |
| rowset_meta->set_index_disk_size(total_index_size + _total_index_size); |
| rowset_meta->set_segments_key_bounds(segments_encoded_key_bounds); |
| // TODO write zonemap to meta |
| rowset_meta->set_empty((num_rows_written + _num_rows_written) == 0); |
| rowset_meta->set_creation_time(time(nullptr)); |
| return Status::OK(); |
| } |
| |
| Status BaseBetaRowsetWriter::_build_tmp(RowsetSharedPtr& rowset_ptr) { |
| Status status; |
| std::shared_ptr<RowsetMeta> tmp_rs_meta = std::make_shared<RowsetMeta>(); |
| tmp_rs_meta->init(_rowset_meta.get()); |
| |
| status = _build_rowset_meta(tmp_rs_meta.get()); |
| if (!status.ok()) { |
| LOG(WARNING) << "failed to build rowset meta, res=" << status; |
| return status; |
| } |
| |
| status = RowsetFactory::create_rowset(_context.tablet_schema, _context.tablet_path, tmp_rs_meta, |
| &rowset_ptr); |
| DBUG_EXECUTE_IF("BaseBetaRowsetWriter::_build_tmp.create_rowset_failed", |
| { status = Status::InternalError("create rowset failed"); }); |
| if (!status.ok()) { |
| LOG(WARNING) << "rowset init failed when build new rowset, res=" << status; |
| return status; |
| } |
| return Status::OK(); |
| } |
| |
| Status BaseBetaRowsetWriter::_create_file_writer(const std::string& path, |
| io::FileWriterPtr& file_writer) { |
| io::FileWriterOptions opts = _context.get_file_writer_options(); |
| Status st = _context.fs()->create_file(path, &file_writer, &opts); |
| if (!st.ok()) { |
| LOG(WARNING) << "failed to create writable file. path=" << path << ", err: " << st; |
| return st; |
| } |
| |
| DCHECK(file_writer != nullptr); |
| return Status::OK(); |
| } |
| |
| Status BaseBetaRowsetWriter::create_file_writer(uint32_t segment_id, io::FileWriterPtr& file_writer, |
| FileType file_type) { |
| auto segment_path = _context.segment_path(segment_id); |
| if (file_type == FileType::INVERTED_INDEX_FILE) { |
| std::string prefix = |
| std::string {InvertedIndexDescriptor::get_index_file_path_prefix(segment_path)}; |
| std::string index_path = InvertedIndexDescriptor::get_index_file_path_v2(prefix); |
| return _create_file_writer(index_path, file_writer); |
| } else if (file_type == FileType::SEGMENT_FILE) { |
| return _create_file_writer(segment_path, file_writer); |
| } |
| return Status::Error<ErrorCode::INTERNAL_ERROR>( |
| fmt::format("failed to create file = {}, file type = {}", segment_path, file_type)); |
| } |
| |
| Status BaseBetaRowsetWriter::create_index_file_writer(uint32_t segment_id, |
| IndexFileWriterPtr* index_file_writer) { |
| RETURN_IF_ERROR(RowsetWriter::create_index_file_writer(segment_id, index_file_writer)); |
| // used for inverted index format v1 |
| (*index_file_writer)->set_file_writer_opts(_context.get_file_writer_options()); |
| return Status::OK(); |
| } |
| |
| Status BetaRowsetWriter::create_segment_writer_for_segcompaction( |
| std::unique_ptr<segment_v2::SegmentWriter>* writer, int64_t begin, int64_t end) { |
| DCHECK(begin >= 0 && end >= 0); |
| std::string path = BetaRowset::local_segment_path_segcompacted(_context.tablet_path, |
| _context.rowset_id, begin, end); |
| io::FileWriterPtr file_writer; |
| RETURN_IF_ERROR(_create_file_writer(path, file_writer)); |
| |
| IndexFileWriterPtr index_file_writer; |
| if (_context.tablet_schema->has_inverted_index() || _context.tablet_schema->has_ann_index()) { |
| io::FileWriterPtr idx_file_writer; |
| std::string prefix(InvertedIndexDescriptor::get_index_file_path_prefix(path)); |
| if (_context.tablet_schema->get_inverted_index_storage_format() != |
| InvertedIndexStorageFormatPB::V1) { |
| std::string index_path = InvertedIndexDescriptor::get_index_file_path_v2(prefix); |
| RETURN_IF_ERROR(_create_file_writer(index_path, idx_file_writer)); |
| } |
| index_file_writer = std::make_unique<IndexFileWriter>( |
| _context.fs(), prefix, _context.rowset_id.to_string(), _num_segcompacted, |
| _context.tablet_schema->get_inverted_index_storage_format(), |
| std::move(idx_file_writer)); |
| } |
| |
| segment_v2::SegmentWriterOptions writer_options; |
| writer_options.enable_unique_key_merge_on_write = _context.enable_unique_key_merge_on_write; |
| writer_options.rowset_ctx = &_context; |
| writer_options.write_type = _context.write_type; |
| writer_options.write_type = DataWriteType::TYPE_COMPACTION; |
| writer_options.max_rows_per_segment = _context.max_rows_per_segment; |
| writer_options.mow_ctx = _context.mow_context; |
| |
| *writer = std::make_unique<segment_v2::SegmentWriter>( |
| file_writer.get(), _num_segcompacted, _context.tablet_schema, _context.tablet, |
| _context.data_dir, writer_options, index_file_writer.get()); |
| if (auto& seg_writer = _segcompaction_worker->get_file_writer(); |
| seg_writer != nullptr && seg_writer->state() != io::FileWriter::State::CLOSED) { |
| RETURN_IF_ERROR(_segcompaction_worker->get_file_writer()->close()); |
| } |
| _segcompaction_worker->get_file_writer().reset(file_writer.release()); |
| if (auto& idx_file_writer = _segcompaction_worker->get_inverted_index_file_writer(); |
| idx_file_writer != nullptr) { |
| RETURN_IF_ERROR(idx_file_writer->begin_close()); |
| RETURN_IF_ERROR(idx_file_writer->finish_close()); |
| } |
| _segcompaction_worker->get_inverted_index_file_writer().reset(index_file_writer.release()); |
| return Status::OK(); |
| } |
| |
| Status BaseBetaRowsetWriter::_check_segment_number_limit(size_t segnum) { |
| DBUG_EXECUTE_IF("BetaRowsetWriter._check_segment_number_limit_too_many_segments", |
| { segnum = dp->param("segnum", 1024); }); |
| if (UNLIKELY(segnum > config::max_segment_num_per_rowset)) { |
| return Status::Error<TOO_MANY_SEGMENTS>( |
| "too many segments in rowset. tablet_id:{}, rowset_id:{}, max:{}, " |
| "_num_segment:{}, rowset_num_rows:{}. Please check if the bucket number is too " |
| "small or if the data is skewed.", |
| _context.tablet_id, _context.rowset_id.to_string(), |
| config::max_segment_num_per_rowset, _num_segment, get_rowset_num_rows()); |
| } |
| return Status::OK(); |
| } |
| |
| Status BetaRowsetWriter::_check_segment_number_limit(size_t segnum) { |
| DBUG_EXECUTE_IF("BetaRowsetWriter._check_segment_number_limit_too_many_segments", |
| { segnum = dp->param("segnum", 1024); }); |
| if (UNLIKELY(segnum > config::max_segment_num_per_rowset)) { |
| return Status::Error<TOO_MANY_SEGMENTS>( |
| "too many segments in rowset. tablet_id:{}, rowset_id:{}, max:{}, _num_segment:{}, " |
| "_segcompacted_point:{}, _num_segcompacted:{}, rowset_num_rows:{}. Please check if " |
| "the bucket number is too small or if the data is skewed.", |
| _context.tablet_id, _context.rowset_id.to_string(), |
| config::max_segment_num_per_rowset, _num_segment, _segcompacted_point, |
| _num_segcompacted, get_rowset_num_rows()); |
| } |
| return Status::OK(); |
| } |
| |
| Status BaseBetaRowsetWriter::add_segment(uint32_t segment_id, const SegmentStatistics& segstat) { |
| uint32_t segid_offset = segment_id - _segment_start_id; |
| { |
| std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex); |
| CHECK_EQ(_segid_statistics_map.find(segment_id) == _segid_statistics_map.end(), true); |
| _segid_statistics_map.emplace(segment_id, segstat); |
| if (segment_id >= _segment_num_rows.size()) { |
| _segment_num_rows.resize(segment_id + 1); |
| } |
| _segment_num_rows[segid_offset] = cast_set<uint32_t>(segstat.row_num); |
| } |
| VLOG_DEBUG << "_segid_statistics_map add new record. segment_id:" << segment_id |
| << " row_num:" << segstat.row_num << " data_size:" << segstat.data_size |
| << " index_size:" << segstat.index_size; |
| |
| { |
| std::lock_guard<std::mutex> lock(_segment_set_mutex); |
| _segment_set.add(segid_offset); |
| while (_segment_set.contains(_num_segment)) { |
| _num_segment++; |
| } |
| } |
| |
| if (_context.mow_context != nullptr) { |
| RETURN_IF_ERROR(_generate_delete_bitmap(segment_id)); |
| } |
| return Status::OK(); |
| } |
| |
| Status BetaRowsetWriter::add_segment(uint32_t segment_id, const SegmentStatistics& segstat) { |
| RETURN_IF_ERROR(BaseBetaRowsetWriter::add_segment(segment_id, segstat)); |
| return _segcompaction_if_necessary(); |
| } |
| |
| Status BetaRowsetWriter::flush_segment_writer_for_segcompaction( |
| std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t index_size, |
| KeyBoundsPB& key_bounds) { |
| uint32_t segid = (*writer)->get_segment_id(); |
| uint32_t row_num = (*writer)->row_count(); |
| uint64_t segment_size; |
| |
| auto s = (*writer)->finalize_footer(&segment_size); |
| if (!s.ok()) { |
| return Status::Error<WRITER_DATA_WRITE_ERROR>("failed to finalize segment: {}", |
| s.to_string()); |
| } |
| int64_t inverted_index_file_size = 0; |
| RETURN_IF_ERROR((*writer)->close_inverted_index(&inverted_index_file_size)); |
| |
| SegmentStatistics segstat; |
| segstat.row_num = row_num; |
| segstat.data_size = segment_size; |
| segstat.index_size = inverted_index_file_size; |
| segstat.key_bounds = key_bounds; |
| { |
| std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex); |
| CHECK_EQ(_segid_statistics_map.find(segid) == _segid_statistics_map.end(), true); |
| _segid_statistics_map.emplace(segid, segstat); |
| } |
| VLOG_DEBUG << "_segid_statistics_map add new record. segid:" << segid << " row_num:" << row_num |
| << " data_size:" << PrettyPrinter::print_bytes(segment_size) |
| << " index_size:" << PrettyPrinter::print_bytes(inverted_index_file_size); |
| |
| writer->reset(); |
| |
| return Status::OK(); |
| } |
| |
| #include "common/compile_check_end.h" |
| } // namespace doris |