blob: c5478499bc0b4556e04d866eacceaef5e7735a68 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "segcompaction.h"
#include <fmt/format.h>
#include <gen_cpp/olap_file.pb.h>
#include <limits.h>
#include <algorithm>
#include <atomic>
#include <condition_variable>
#include <filesystem>
#include <map>
#include <memory>
#include <mutex>
#include <sstream>
#include <string>
#include <utility>
#include "absl/strings/substitute.h"
#include "beta_rowset_writer.h"
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/logging.h"
#include "io/fs/file_system.h"
#include "io/fs/file_writer.h"
#include "io/io_common.h"
#include "olap/data_dir.h"
#include "olap/iterators.h"
#include "olap/merger.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/rowset/rowset_writer_context.h"
#include "olap/rowset/segment_v2/inverted_index_cache.h"
#include "olap/rowset/segment_v2/inverted_index_desc.h"
#include "olap/rowset/segment_v2/segment.h"
#include "olap/rowset/segment_v2/segment_writer.h"
#include "olap/schema.h"
#include "olap/storage_engine.h"
#include "olap/tablet_reader.h"
#include "olap/tablet_schema.h"
#include "runtime/memory/global_memory_arbitrator.h"
#include "runtime/thread_context.h"
#include "util/debug_points.h"
#include "util/mem_info.h"
#include "util/time.h"
#include "vec/olap/vertical_block_reader.h"
#include "vec/olap/vertical_merge_iterator.h"
namespace doris {
#include "common/compile_check_begin.h"
using namespace ErrorCode;
SegcompactionWorker::SegcompactionWorker(BetaRowsetWriter* writer) : _writer(writer) {}
void SegcompactionWorker::init_mem_tracker(const RowsetWriterContext& rowset_writer_context) {
_seg_compact_mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::COMPACTION,
fmt::format("segcompaction-txnID_{}-loadID_{}-tabletID_{}-indexID_{}-"
"partitionID_{}-version_{}",
std::to_string(rowset_writer_context.txn_id),
print_id(rowset_writer_context.load_id),
std::to_string(rowset_writer_context.tablet_id),
std::to_string(rowset_writer_context.index_id),
std::to_string(rowset_writer_context.partition_id),
rowset_writer_context.version.to_string()));
}
Status SegcompactionWorker::_get_segcompaction_reader(
SegCompactionCandidatesSharedPtr segments, TabletSharedPtr tablet,
std::shared_ptr<Schema> schema, OlapReaderStatistics* stat,
vectorized::RowSourcesBuffer& row_sources_buf, bool is_key,
std::vector<uint32_t>& return_columns, std::vector<uint32_t>& key_group_cluster_key_idxes,
std::unique_ptr<vectorized::VerticalBlockReader>* reader) {
const auto& ctx = _writer->_context;
bool record_rowids = need_convert_delete_bitmap() && is_key;
StorageReadOptions read_options;
read_options.stats = stat;
read_options.use_page_cache = false;
read_options.tablet_schema = ctx.tablet_schema;
read_options.record_rowids = record_rowids;
if (!tablet->tablet_schema()->cluster_key_uids().empty()) {
DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(tablet->tablet_id());
RETURN_IF_ERROR(tablet->calc_delete_bitmap_between_segments(
ctx.tablet_schema, ctx.rowset_id, *segments, delete_bitmap));
for (auto& seg_ptr : *segments) {
auto d = delete_bitmap->get_agg(
{ctx.rowset_id, seg_ptr->id(), DeleteBitmap::TEMP_VERSION_COMMON});
if (d->isEmpty()) {
continue; // Empty delete bitmap for the segment
}
read_options.delete_bitmap.emplace(seg_ptr->id(), std::move(d));
}
}
std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators;
std::map<uint32_t, uint32_t> segment_rows;
for (auto& seg_ptr : *segments) {
std::unique_ptr<RowwiseIterator> iter;
auto s = seg_ptr->new_iterator(schema, read_options, &iter);
if (!s.ok()) {
return Status::Error<INIT_FAILED>("failed to create iterator[{}]: {}", seg_ptr->id(),
s.to_string());
}
seg_iterators.push_back(std::move(iter));
segment_rows.emplace(seg_ptr->id(), seg_ptr->num_rows());
}
if (record_rowids && _rowid_conversion != nullptr) {
_rowid_conversion->reset_segment_map(segment_rows);
}
*reader = std::make_unique<vectorized::VerticalBlockReader>(&row_sources_buf);
TabletReader::ReaderParams reader_params;
reader_params.is_segcompaction = true;
reader_params.segment_iters_ptr = &seg_iterators;
// no reader_params.version shouldn't break segcompaction
reader_params.tablet_schema = ctx.tablet_schema;
reader_params.tablet = tablet;
reader_params.return_columns = return_columns;
reader_params.is_key_column_group = is_key;
reader_params.use_page_cache = false;
reader_params.record_rowids = record_rowids;
reader_params.key_group_cluster_key_idxes = key_group_cluster_key_idxes;
return (*reader)->init(reader_params, nullptr);
}
std::unique_ptr<segment_v2::SegmentWriter> SegcompactionWorker::_create_segcompaction_writer(
uint32_t begin, uint32_t end) {
Status status;
std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr;
status = _create_segment_writer_for_segcompaction(&writer, begin, end);
if (!status.ok() || writer == nullptr) {
LOG(ERROR) << "failed to create segment writer for begin:" << begin << " end:" << end
<< " status:" << status;
return nullptr;
} else {
return writer;
}
}
Status SegcompactionWorker::_delete_original_segments(uint32_t begin, uint32_t end) {
DCHECK(_writer->rowset_meta()->is_local());
const auto& fs = io::global_local_filesystem();
auto ctx = _writer->_context;
auto schema = ctx.tablet_schema;
for (uint32_t i = begin; i <= end; ++i) {
auto seg_path = local_segment_path(ctx.tablet_path, ctx.rowset_id.to_string(), i);
// Even if an error is encountered, these files that have not been cleaned up
// will be cleaned up by the GC background. So here we only print the error
// message when we encounter an error.
RETURN_NOT_OK_STATUS_WITH_WARN(fs->delete_file(seg_path),
absl::Substitute("Failed to delete file=$0", seg_path));
if ((schema->has_inverted_index() || schema->has_ann_index()) &&
schema->get_inverted_index_storage_format() >= InvertedIndexStorageFormatPB::V2) {
auto idx_path = InvertedIndexDescriptor::get_index_file_path_v2(
InvertedIndexDescriptor::get_index_file_path_prefix(seg_path));
VLOG_DEBUG << "segcompaction index. delete file " << idx_path;
RETURN_NOT_OK_STATUS_WITH_WARN(fs->delete_file(idx_path),
absl::Substitute("Failed to delete file=$0", idx_path));
}
// Delete inverted index files
for (auto&& column : schema->columns()) {
auto index_infos = schema->inverted_indexs(*column);
for (const auto& index_info : index_infos) {
auto index_id = index_info->index_id();
if (schema->get_inverted_index_storage_format() ==
InvertedIndexStorageFormatPB::V1) {
auto idx_path = InvertedIndexDescriptor::get_index_file_path_v1(
InvertedIndexDescriptor::get_index_file_path_prefix(seg_path), index_id,
index_info->get_index_suffix());
VLOG_DEBUG << "segcompaction index. delete file " << idx_path;
RETURN_NOT_OK_STATUS_WITH_WARN(
fs->delete_file(idx_path),
absl::Substitute("Failed to delete file=$0", idx_path));
}
// Erase the origin index file cache
auto idx_file_cache_key = InvertedIndexDescriptor::get_index_file_cache_key(
InvertedIndexDescriptor::get_index_file_path_prefix(seg_path), index_id,
index_info->get_index_suffix());
RETURN_IF_ERROR(InvertedIndexSearcherCache::instance()->erase(idx_file_cache_key));
}
}
}
return Status::OK();
}
Status SegcompactionWorker::_check_correctness(OlapReaderStatistics& reader_stat,
Merger::Statistics& merger_stat, uint32_t begin,
uint32_t end, bool is_mow_with_cluster_keys) {
uint64_t raw_rows_read = reader_stat.raw_rows_read; /* total rows read before merge */
uint64_t rows_del_by_bitmap = reader_stat.rows_del_by_bitmap;
uint64_t sum_src_row = 0; /* sum of rows in each involved source segments */
uint64_t filtered_rows = merger_stat.filtered_rows; /* rows filtered by del conditions */
uint64_t output_rows = merger_stat.output_rows; /* rows after merge */
uint64_t merged_rows = merger_stat.merged_rows; /* dup key merged by unique/agg */
{
std::lock_guard<std::mutex> lock(_writer->_segid_statistics_map_mutex);
for (int i = begin; i <= end; ++i) {
sum_src_row += _writer->_segid_statistics_map[i].row_num;
}
}
DBUG_EXECUTE_IF("SegcompactionWorker._check_correctness_wrong_sum_src_row", { sum_src_row++; });
uint64_t raw_rows = raw_rows_read;
if (is_mow_with_cluster_keys) {
raw_rows += rows_del_by_bitmap;
}
if (raw_rows != sum_src_row) {
return Status::Error<CHECK_LINES_ERROR>(
"segcompaction read row num does not match source. expect read row:{}, actual read "
"row:{}(raw_rows_read: {}, rows_del_by_bitmap: {})",
sum_src_row, raw_rows, raw_rows_read, rows_del_by_bitmap);
}
DBUG_EXECUTE_IF("SegcompactionWorker._check_correctness_wrong_merged_rows", { merged_rows++; });
if ((output_rows + merged_rows) != raw_rows_read) {
return Status::Error<CHECK_LINES_ERROR>(
"segcompaction total row num does not match after merge. expect total row:{}, "
"actual total row:{}, (output_rows:{},merged_rows:{})",
raw_rows_read, output_rows + merged_rows, output_rows, merged_rows);
}
DBUG_EXECUTE_IF("SegcompactionWorker._check_correctness_wrong_filtered_rows",
{ filtered_rows++; });
if (filtered_rows != 0) {
return Status::Error<CHECK_LINES_ERROR>(
"segcompaction should not have filtered rows but actual filtered rows:{}",
filtered_rows);
}
return Status::OK();
}
Status SegcompactionWorker::_create_segment_writer_for_segcompaction(
std::unique_ptr<segment_v2::SegmentWriter>* writer, uint32_t begin, uint32_t end) {
return _writer->create_segment_writer_for_segcompaction(writer, begin, end);
}
Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPtr segments) {
DCHECK(_seg_compact_mem_tracker != nullptr);
SCOPED_ATTACH_TASK(_seg_compact_mem_tracker);
/* throttle segcompaction task if memory depleted */
if (GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
return Status::Error<FETCH_MEMORY_EXCEEDED>("skip segcompaction due to memory shortage");
}
uint32_t begin = (*(segments->begin()))->id();
uint32_t end = (*(segments->end() - 1))->id();
uint64_t begin_time = GetCurrentTimeMicros();
uint64_t index_size = 0;
uint64_t total_index_size = 0;
auto ctx = _writer->_context;
auto writer = _create_segcompaction_writer(begin, end);
if (UNLIKELY(writer == nullptr)) {
return Status::Error<SEGCOMPACTION_INIT_WRITER>("failed to get segcompaction writer");
}
DCHECK(ctx.tablet);
auto tablet = std::static_pointer_cast<Tablet>(ctx.tablet);
if (need_convert_delete_bitmap() && _rowid_conversion == nullptr) {
_rowid_conversion = std::make_unique<SimpleRowIdConversion>(_writer->rowset_id());
}
std::vector<std::vector<uint32_t>> column_groups;
std::vector<uint32_t> key_group_cluster_key_idxes;
Merger::vertical_split_columns(*ctx.tablet_schema, &column_groups,
&key_group_cluster_key_idxes);
vectorized::RowSourcesBuffer row_sources_buf(tablet->tablet_id(), tablet->tablet_path(),
ReaderType::READER_SEGMENT_COMPACTION);
KeyBoundsPB key_bounds;
Merger::Statistics key_merger_stats;
OlapReaderStatistics key_reader_stats;
/* compact group one by one */
for (auto i = 0; i < column_groups.size(); ++i) {
VLOG_NOTICE << "row source size: " << row_sources_buf.total_size();
bool is_key = (i == 0);
std::vector<uint32_t> column_ids = column_groups[i];
writer->clear();
RETURN_IF_ERROR(writer->init(column_ids, is_key));
auto schema = std::make_shared<Schema>(ctx.tablet_schema->columns(), column_ids);
OlapReaderStatistics reader_stats;
std::unique_ptr<vectorized::VerticalBlockReader> reader;
auto s =
_get_segcompaction_reader(segments, tablet, schema, &reader_stats, row_sources_buf,
is_key, column_ids, key_group_cluster_key_idxes, &reader);
if (UNLIKELY(reader == nullptr || !s.ok())) {
return Status::Error<SEGCOMPACTION_INIT_READER>(
"failed to get segcompaction reader. err: {}", s.to_string());
}
Merger::Statistics merger_stats;
RETURN_IF_ERROR(Merger::vertical_compact_one_group(
tablet->tablet_id(), ReaderType::READER_SEGMENT_COMPACTION, *ctx.tablet_schema,
is_key, column_ids, &row_sources_buf, *reader, *writer, &merger_stats, &index_size,
key_bounds, _rowid_conversion.get()));
total_index_size += index_size;
if (is_key) {
RETURN_IF_ERROR(row_sources_buf.flush());
key_merger_stats = merger_stats;
key_reader_stats = reader_stats;
}
RETURN_IF_ERROR(row_sources_buf.seek_to_begin());
}
/* check row num after merge/aggregation */
bool is_mow_with_cluster_keys = !tablet->tablet_schema()->cluster_key_uids().empty();
RETURN_NOT_OK_STATUS_WITH_WARN(_check_correctness(key_reader_stats, key_merger_stats, begin,
end, is_mow_with_cluster_keys),
"check correctness failed");
{
std::lock_guard<std::mutex> lock(_writer->_segid_statistics_map_mutex);
_writer->_clear_statistics_for_deleting_segments_unsafe(begin, end);
}
RETURN_IF_ERROR(
_writer->flush_segment_writer_for_segcompaction(&writer, total_index_size, key_bounds));
if (_file_writer != nullptr && _file_writer->state() != io::FileWriter::State::CLOSED) {
RETURN_IF_ERROR(_file_writer->close());
}
RETURN_IF_ERROR(_delete_original_segments(begin, end));
if (_rowid_conversion != nullptr) {
convert_segment_delete_bitmap(ctx.mow_context->delete_bitmap, begin, end,
_writer->_num_segcompacted);
}
RETURN_IF_ERROR(_writer->_rename_compacted_segments(begin, end));
if (_index_file_writer != nullptr) {
_index_file_writer.reset();
}
if (VLOG_DEBUG_IS_ON) {
_writer->vlog_buffer.clear();
for (const auto& entry : std::filesystem::directory_iterator(ctx.tablet_path)) {
fmt::format_to(_writer->vlog_buffer, "[{}]", std::string(entry.path()));
}
VLOG_DEBUG << "tablet_id:" << ctx.tablet_id << " rowset_id:" << ctx.rowset_id
<< "_segcompacted_point:" << _writer->_segcompacted_point
<< " _num_segment:" << _writer->_num_segment
<< " _num_segcompacted:" << _writer->_num_segcompacted
<< " list directory:" << fmt::to_string(_writer->vlog_buffer);
}
_writer->_segcompacted_point += (end - begin + 1);
uint64_t elapsed = GetCurrentTimeMicros() - begin_time;
LOG(INFO) << "segcompaction completed. tablet_id:" << ctx.tablet_id
<< " rowset_id:" << ctx.rowset_id << " elapsed time:" << elapsed
<< "us. update segcompacted_point:" << _writer->_segcompacted_point
<< " segment num:" << segments->size() << " begin:" << begin << " end:" << end;
return Status::OK();
}
void SegcompactionWorker::compact_segments(SegCompactionCandidatesSharedPtr segments) {
Status status = Status::OK();
if (_is_compacting_state_mutable.exchange(false)) {
status = _do_compact_segments(segments);
} else {
// note: be aware that _writer maybe released when the task is cancelled
LOG(INFO) << "segcompaction worker is cancelled, skipping segcompaction task";
return;
}
if (!status.ok()) {
int errcode = status.code();
switch (errcode) {
case FETCH_MEMORY_EXCEEDED:
case SEGCOMPACTION_INIT_READER:
case SEGCOMPACTION_INIT_WRITER:
LOG(WARNING) << "segcompaction failed, try next time:" << status;
break;
default:
auto ctx = _writer->_context;
LOG(WARNING) << "segcompaction fatal, terminating the write job."
<< " tablet_id:" << ctx.tablet_id << " rowset_id:" << ctx.rowset_id
<< " status:" << status;
// status will be checked by the next trigger of segcompaction or the final wait
_writer->_segcompaction_status.store(ErrorCode::INTERNAL_ERROR);
}
}
DCHECK_EQ(_writer->_is_doing_segcompaction, true);
{
std::lock_guard lk(_writer->_is_doing_segcompaction_lock);
_writer->_is_doing_segcompaction = false;
_writer->_segcompacting_cond.notify_all();
}
_is_compacting_state_mutable = true;
}
bool SegcompactionWorker::need_convert_delete_bitmap() {
if (_writer == nullptr) {
return false;
}
auto tablet = _writer->context().tablet;
return tablet != nullptr && tablet->keys_type() == KeysType::UNIQUE_KEYS &&
tablet->enable_unique_key_merge_on_write() &&
tablet->tablet_schema()->has_sequence_col();
}
void SegcompactionWorker::convert_segment_delete_bitmap(DeleteBitmapPtr src_delete_bitmap,
uint32_t src_seg_id, uint32_t dest_seg_id) {
// lazy init
if (nullptr == _converted_delete_bitmap) {
_converted_delete_bitmap = std::make_shared<DeleteBitmap>(_writer->context().tablet_id);
}
auto rowset_id = _writer->context().rowset_id;
const auto* seg_map =
src_delete_bitmap->get({rowset_id, src_seg_id, DeleteBitmap::TEMP_VERSION_COMMON});
if (seg_map != nullptr) {
_converted_delete_bitmap->set({rowset_id, dest_seg_id, DeleteBitmap::TEMP_VERSION_COMMON},
*seg_map);
}
}
void SegcompactionWorker::convert_segment_delete_bitmap(DeleteBitmapPtr src_delete_bitmap,
uint32_t src_begin, uint32_t src_end,
uint32_t dst_seg_id) {
// lazy init
if (nullptr == _converted_delete_bitmap) {
_converted_delete_bitmap = std::make_shared<DeleteBitmap>(_writer->context().tablet_id);
}
auto rowset_id = _writer->context().rowset_id;
RowLocation src(rowset_id, 0, 0);
for (uint32_t seg_id = src_begin; seg_id <= src_end; seg_id++) {
const auto* seg_map =
src_delete_bitmap->get({rowset_id, seg_id, DeleteBitmap::TEMP_VERSION_COMMON});
if (!seg_map) {
continue;
}
src.segment_id = seg_id;
for (unsigned int row_id : *seg_map) {
src.row_id = row_id;
auto dst_row_id = _rowid_conversion->get(src);
if (dst_row_id < 0) {
continue;
}
_converted_delete_bitmap->add(
{rowset_id, dst_seg_id, DeleteBitmap::TEMP_VERSION_COMMON}, dst_row_id);
}
}
}
bool SegcompactionWorker::cancel() {
// return true if the task is canncellable (actual compaction is not started)
// return false when the task is not cancellable (it is in the middle of segcompaction)
return _is_compacting_state_mutable.exchange(false);
}
#include "common/compile_check_end.h"
} // namespace doris