| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "cloud/cloud_rowset_writer.h" |
| |
| #include "common/status.h" |
| #include "io/cache/block_file_cache_factory.h" |
| #include "io/fs/packed_file_writer.h" |
| #include "olap/rowset/rowset_factory.h" |
| |
| namespace doris { |
| |
| CloudRowsetWriter::CloudRowsetWriter(CloudStorageEngine& engine) : _engine(engine) {} |
| |
| CloudRowsetWriter::~CloudRowsetWriter() = default; |
| |
| Status CloudRowsetWriter::init(const RowsetWriterContext& rowset_writer_context) { |
| _context = rowset_writer_context; |
| _rowset_meta = std::make_shared<RowsetMeta>(); |
| |
| if (_context.is_local_rowset()) { |
| // In cloud mode, this branch implies it is an intermediate rowset for external merge sort, |
| // we use `global_local_filesystem` to write data to `tmp_file_dir`(see `local_segment_path`). |
| _context.tablet_path = io::FileCacheFactory::instance()->pick_one_cache_path(); |
| } else { |
| _rowset_meta->set_remote_storage_resource(*_context.storage_resource); |
| } |
| |
| _rowset_meta->set_rowset_id(_context.rowset_id); |
| _rowset_meta->set_partition_id(_context.partition_id); |
| _rowset_meta->set_tablet_id(_context.tablet_id); |
| _rowset_meta->set_index_id(_context.index_id); |
| _rowset_meta->set_tablet_schema_hash(_context.tablet_schema_hash); |
| _rowset_meta->set_rowset_type(_context.rowset_type); |
| _rowset_meta->set_rowset_state(_context.rowset_state); |
| _rowset_meta->set_segments_overlap(_context.segments_overlap); |
| _rowset_meta->set_txn_id(_context.txn_id); |
| _rowset_meta->set_txn_expiration(_context.txn_expiration); |
| _rowset_meta->set_compaction_level(_context.compaction_level); |
| if (_context.rowset_state == PREPARED || _context.rowset_state == COMMITTED) { |
| _is_pending = true; |
| _rowset_meta->set_load_id(_context.load_id); |
| } else { |
| // Rowset generated by compaction or schema change |
| _rowset_meta->set_version(_context.version); |
| DCHECK_NE(_context.newest_write_timestamp, -1); |
| _rowset_meta->set_newest_write_timestamp(_context.newest_write_timestamp); |
| } |
| _rowset_meta->set_tablet_schema(_context.tablet_schema); |
| _context.segment_collector = std::make_shared<SegmentCollectorT<BaseBetaRowsetWriter>>(this); |
| _context.file_writer_creator = std::make_shared<FileWriterCreatorT<BaseBetaRowsetWriter>>(this); |
| if (_context.mow_context != nullptr) { |
| _calc_delete_bitmap_token = _engine.calc_delete_bitmap_executor_for_load()->create_token(); |
| } |
| return Status::OK(); |
| } |
| |
| Status CloudRowsetWriter::_build_rowset_meta(RowsetMeta* rowset_meta, bool check_segment_num) { |
| // Call base class implementation |
| RETURN_IF_ERROR(BaseBetaRowsetWriter::_build_rowset_meta(rowset_meta, check_segment_num)); |
| |
| // Collect packed file segment index information for interim rowsets as well. |
| return _collect_all_packed_slice_locations(rowset_meta); |
| } |
| |
| Status CloudRowsetWriter::build(RowsetSharedPtr& rowset) { |
| if (_calc_delete_bitmap_token != nullptr) { |
| RETURN_IF_ERROR(_calc_delete_bitmap_token->wait()); |
| } |
| RETURN_IF_ERROR(_close_file_writers()); |
| |
| // TODO(plat1ko): check_segment_footer |
| |
| RETURN_IF_ERROR(_build_rowset_meta(_rowset_meta.get())); |
| // At this point all writers have been closed, so collecting packed file indices is safe. |
| RETURN_IF_ERROR(_collect_all_packed_slice_locations(_rowset_meta.get())); |
| // If the current load is a partial update, new segments may be appended to the tmp rowset after the tmp rowset |
| // has been committed if conflicts occur due to concurrent partial updates. However, when the recycler do recycling, |
| // it will generate the paths for the segments to be recycled on the object storage based on the number of segments |
| // in the rowset meta. If these newly added segments are written to the object storage and the transaction is aborted |
| // due to a failure before successfully updating the rowset meta of the corresponding tmp rowset, these newly added |
| // segments cannot be recycled by the recycler on the object storage. Therefore, we need a new state `BEGIN_PARTIAL_UPDATE` |
| // to indicate that the recycler should use list+delete to recycle segments. After the tmp rowset's rowset meta being |
| // updated successfully, the `rowset_state` will be set to `COMMITTED` and the recycler can do recycling based on the |
| // number of segments in the rowset meta safely. |
| // |
| // rowset_state's FSM: |
| // |
| // transfer 0 |
| // PREPARED ---------------------------> COMMITTED |
| // | ^ |
| // | transfer 1 | |
| // | | transfer 2 |
| // |--> BEGIN_PARTIAL_UPDATE ------| |
| // |
| // transfer 0 (PREPARED -> COMMITTED): finish writing a rowset and the rowset' meta will not be changed |
| // transfer 1 (PREPARED -> BEGIN_PARTIAL_UPDATE): finish writing a rowset, but may append new segments later and the rowset's meta may be changed |
| // transfer 2 (BEGIN_PARTIAL_UPDATE -> VISIBLE): finish adding new segments and the rowset' meta will not be changed, the rowset is visible to users |
| if (_context.partial_update_info && _context.partial_update_info->is_partial_update()) { |
| _rowset_meta->set_rowset_state(BEGIN_PARTIAL_UPDATE); |
| } else { |
| _rowset_meta->set_rowset_state(COMMITTED); |
| } |
| |
| _rowset_meta->set_tablet_schema(_context.tablet_schema); |
| |
| if (_rowset_meta->newest_write_timestamp() == -1) { |
| _rowset_meta->set_newest_write_timestamp(UnixSeconds()); |
| } |
| |
| if (auto seg_file_size = _seg_files.segments_file_size(_segment_start_id); |
| !seg_file_size.has_value()) [[unlikely]] { |
| LOG(ERROR) << "expected segment file sizes, but none presents: " << seg_file_size.error(); |
| } else { |
| _rowset_meta->add_segments_file_size(seg_file_size.value()); |
| } |
| if (_context.tablet_schema->has_inverted_index() || _context.tablet_schema->has_ann_index()) { |
| if (auto idx_files_info = _idx_files.inverted_index_file_info(_segment_start_id); |
| !idx_files_info.has_value()) [[unlikely]] { |
| LOG(ERROR) << "expected inverted index files info, but none presents: " |
| << idx_files_info.error(); |
| } else { |
| _rowset_meta->add_inverted_index_files_info(idx_files_info.value()); |
| } |
| } |
| |
| RETURN_NOT_OK_STATUS_WITH_WARN( |
| RowsetFactory::create_rowset(_context.tablet_schema, _context.tablet_path, _rowset_meta, |
| &rowset), |
| "rowset init failed when build new rowset"); |
| _already_built = true; |
| return Status::OK(); |
| } |
| |
| Status CloudRowsetWriter::_collect_all_packed_slice_locations(RowsetMeta* rowset_meta) { |
| if (!_context.packed_file_active) { |
| return Status::OK(); |
| } |
| |
| // Collect segment file packed indices |
| const auto& file_writers = _seg_files.get_file_writers(); |
| for (const auto& [seg_id, writer_ptr] : file_writers) { |
| auto segment_path = _context.segment_path(seg_id); |
| RETURN_IF_ERROR( |
| _collect_packed_slice_location(writer_ptr.get(), segment_path, rowset_meta)); |
| } |
| |
| // Collect inverted index file packed indices |
| const auto& idx_file_writers = _idx_files.get_file_writers(); |
| for (const auto& [seg_id, idx_writer_ptr] : idx_file_writers) { |
| if (idx_writer_ptr != nullptr && idx_writer_ptr->get_file_writer() != nullptr) { |
| auto segment_path = _context.segment_path(seg_id); |
| auto index_prefix_view = |
| InvertedIndexDescriptor::get_index_file_path_prefix(segment_path); |
| std::string index_path = |
| InvertedIndexDescriptor::get_index_file_path_v2(std::string(index_prefix_view)); |
| RETURN_IF_ERROR(_collect_packed_slice_location(idx_writer_ptr->get_file_writer(), |
| index_path, rowset_meta)); |
| } |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status CloudRowsetWriter::_collect_packed_slice_location(io::FileWriter* file_writer, |
| const std::string& file_path, |
| RowsetMeta* rowset_meta) { |
| // At this point, we only call this when RowsetWriterContext::merge_file_active is true, |
| // and all writers should be MergeFileWriter. So we can safely cast without extra checks. |
| auto* packed_writer = static_cast<io::PackedFileWriter*>(file_writer); |
| |
| if (packed_writer->state() != io::FileWriter::State::CLOSED) { |
| // Writer is still open; index will be collected after it is closed. |
| return Status::OK(); |
| } |
| |
| io::PackedSliceLocation index; |
| RETURN_IF_ERROR(packed_writer->get_packed_slice_location(&index)); |
| if (index.packed_file_path.empty()) { |
| return Status::OK(); // File not in packed file, skip |
| } |
| |
| rowset_meta->add_packed_slice_location(file_path, index.packed_file_path, index.offset, |
| index.size); |
| LOG(INFO) << "collect packed file index: " << file_path << " -> " << index.packed_file_path |
| << ", offset: " << index.offset << ", size: " << index.size; |
| return Status::OK(); |
| } |
| |
| } // namespace doris |