blob: c3907621e8ed1739cfd2a9acac8a1674d62549f7 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap/rowset/segment_v2/segment_writer.h"
#include <assert.h>
#include <gen_cpp/segment_v2.pb.h>
#include <parallel_hashmap/phmap.h>
#include <algorithm>
// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
#include <crc32c/crc32c.h>
#include "cloud/config.h"
#include "common/cast_set.h"
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "common/logging.h" // LOG
#include "common/status.h"
#include "inverted_index_fs_directory.h"
#include "io/cache/block_file_cache.h"
#include "io/cache/block_file_cache_factory.h"
#include "io/fs/file_system.h"
#include "io/fs/file_writer.h"
#include "io/fs/local_file_system.h"
#include "olap/data_dir.h"
#include "olap/key_coder.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
#include "olap/partial_update_info.h"
#include "olap/primary_key_index.h"
#include "olap/row_cursor.h" // RowCursor // IWYU pragma: keep
#include "olap/rowset/rowset_writer_context.h" // RowsetWriterContext
#include "olap/rowset/segment_creator.h"
#include "olap/rowset/segment_v2/column_writer.h" // ColumnWriter
#include "olap/rowset/segment_v2/external_col_meta_util.h"
#include "olap/rowset/segment_v2/index_file_writer.h"
#include "olap/rowset/segment_v2/index_writer.h"
#include "olap/rowset/segment_v2/page_io.h"
#include "olap/rowset/segment_v2/page_pointer.h"
#include "olap/rowset/segment_v2/variant/variant_ext_meta_writer.h"
#include "olap/rowset/segment_v2/variant_stats_calculator.h"
#include "olap/segment_loader.h"
#include "olap/short_key_index.h"
#include "olap/storage_engine.h"
#include "olap/tablet_schema.h"
#include "olap/utils.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker.h"
#include "service/point_query_executor.h"
#include "util/coding.h"
#include "util/faststring.h"
#include "util/key_util.h"
#include "util/simd/bits.h"
#include "vec/columns/column_nullable.h"
#include "vec/common/variant_util.h"
#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/core/types.h"
#include "vec/jsonb/serialize.h"
#include "vec/olap/olap_data_convertor.h"
#include "vec/runtime/vdatetime_value.h"
namespace doris {
namespace segment_v2 {
#include "common/compile_check_begin.h"
using namespace ErrorCode;
const char* k_segment_magic = "D0R1";
const uint32_t k_segment_magic_length = 4;
inline std::string segment_mem_tracker_name(uint32_t segment_id) {
return "SegmentWriter:Segment-" + std::to_string(segment_id);
}
SegmentWriter::SegmentWriter(io::FileWriter* file_writer, uint32_t segment_id,
TabletSchemaSPtr tablet_schema, BaseTabletSPtr tablet,
DataDir* data_dir, const SegmentWriterOptions& opts,
IndexFileWriter* index_file_writer)
: _segment_id(segment_id),
_tablet_schema(std::move(tablet_schema)),
_tablet(std::move(tablet)),
_data_dir(data_dir),
_opts(opts),
_file_writer(file_writer),
_index_file_writer(index_file_writer),
_mem_tracker(std::make_unique<MemTracker>(segment_mem_tracker_name(segment_id))),
_mow_context(std::move(opts.mow_ctx)) {
CHECK_NOTNULL(file_writer);
_num_sort_key_columns = _tablet_schema->num_key_columns();
_num_short_key_columns = _tablet_schema->num_short_key_columns();
if (!_is_mow_with_cluster_key()) {
DCHECK(_num_sort_key_columns >= _num_short_key_columns)
<< ", table_id=" << _tablet_schema->table_id()
<< ", num_key_columns=" << _num_sort_key_columns
<< ", num_short_key_columns=" << _num_short_key_columns
<< ", cluster_key_columns=" << _tablet_schema->cluster_key_uids().size();
}
for (size_t cid = 0; cid < _num_sort_key_columns; ++cid) {
const auto& column = _tablet_schema->column(cid);
_key_coders.push_back(get_key_coder(column.type()));
_key_index_size.push_back(cast_set<uint16_t>(column.index_length()));
}
if (_is_mow()) {
// encode the sequence id into the primary key index
if (_tablet_schema->has_sequence_col()) {
const auto& column = _tablet_schema->column(_tablet_schema->sequence_col_idx());
_seq_coder = get_key_coder(column.type());
}
// encode the rowid into the primary key index
if (_is_mow_with_cluster_key()) {
const auto* type_info = get_scalar_type_info<FieldType::OLAP_FIELD_TYPE_UNSIGNED_INT>();
_rowid_coder = get_key_coder(type_info->type());
// primary keys
_primary_key_coders.swap(_key_coders);
// cluster keys
_key_coders.clear();
_key_index_size.clear();
_num_sort_key_columns = _tablet_schema->cluster_key_uids().size();
for (auto cid : _tablet_schema->cluster_key_uids()) {
const auto& column = _tablet_schema->column_by_uid(cid);
_key_coders.push_back(get_key_coder(column.type()));
_key_index_size.push_back(cast_set<uint16_t>(column.index_length()));
}
}
}
}
SegmentWriter::~SegmentWriter() {
_mem_tracker->release(_mem_tracker->consumption());
}
void SegmentWriter::init_column_meta(ColumnMetaPB* meta, uint32_t column_id,
const TabletColumn& column, TabletSchemaSPtr tablet_schema) {
meta->set_column_id(column_id);
meta->set_type(int(column.type()));
meta->set_length(column.length());
meta->set_encoding(DEFAULT_ENCODING);
meta->set_compression(_opts.compression_type);
meta->set_is_nullable(column.is_nullable());
meta->set_default_value(column.default_value());
meta->set_precision(column.precision());
meta->set_frac(column.frac());
if (column.has_path_info()) {
column.path_info_ptr()->to_protobuf(meta->mutable_column_path_info(),
column.parent_unique_id());
}
meta->set_unique_id(column.unique_id());
for (uint32_t i = 0; i < column.get_subtype_count(); ++i) {
init_column_meta(meta->add_children_columns(), column_id, column.get_sub_column(i),
tablet_schema);
}
meta->set_result_is_nullable(column.get_result_is_nullable());
meta->set_function_name(column.get_aggregation_name());
meta->set_be_exec_version(column.get_be_exec_version());
if (column.is_variant_type()) {
meta->set_variant_max_subcolumns_count(column.variant_max_subcolumns_count());
}
}
Status SegmentWriter::init() {
std::vector<uint32_t> column_ids;
auto column_cnt = cast_set<int>(_tablet_schema->num_columns());
for (uint32_t i = 0; i < column_cnt; ++i) {
column_ids.emplace_back(i);
}
return init(column_ids, true);
}
Status SegmentWriter::_create_column_writer(uint32_t cid, const TabletColumn& column,
const TabletSchemaSPtr& schema) {
ColumnWriterOptions opts;
opts.meta = _footer.add_columns();
init_column_meta(opts.meta, cid, column, schema);
// now we create zone map for key columns in AGG_KEYS or all column in UNIQUE_KEYS or DUP_KEYS
// except for columns whose type don't support zone map.
opts.need_zone_map = column.is_key() || schema->keys_type() != KeysType::AGG_KEYS;
opts.need_bloom_filter = column.is_bf_column();
if (opts.need_bloom_filter) {
opts.bf_options.fpp = schema->has_bf_fpp() ? schema->bloom_filter_fpp() : 0.05;
}
auto* tablet_index = schema->get_ngram_bf_index(column.unique_id());
if (tablet_index) {
opts.need_bloom_filter = true;
opts.is_ngram_bf_index = true;
//narrow convert from int32_t to uint8_t and uint16_t which is dangerous
auto gram_size = tablet_index->get_gram_size();
auto gram_bf_size = tablet_index->get_gram_bf_size();
if (gram_size > 256 || gram_size < 1) {
return Status::NotSupported("Do not support ngram bloom filter for ngram_size: ",
gram_size);
}
if (gram_bf_size > 65535 || gram_bf_size < 64) {
return Status::NotSupported("Do not support ngram bloom filter for bf_size: ",
gram_bf_size);
}
opts.gram_size = cast_set<uint8_t>(gram_size);
opts.gram_bf_size = cast_set<uint16_t>(gram_bf_size);
}
bool skip_inverted_index = false;
if (_opts.rowset_ctx != nullptr) {
// skip write inverted index for index compaction column
skip_inverted_index =
_opts.rowset_ctx->columns_to_do_index_compaction.count(column.unique_id()) > 0;
}
// skip write inverted index on load if skip_write_index_on_load is true
if (_opts.write_type == DataWriteType::TYPE_DIRECT && schema->skip_write_index_on_load()) {
skip_inverted_index = true;
}
// indexes for this column
if (!skip_inverted_index) {
auto inverted_indexs = schema->inverted_indexs(column);
if (!inverted_indexs.empty()) {
opts.inverted_indexes = inverted_indexs;
opts.need_inverted_index = true;
DCHECK(_index_file_writer != nullptr);
}
}
// indexes for this column
if (const auto& index = schema->ann_index(column); index != nullptr) {
opts.ann_index = index;
opts.need_ann_index = true;
DCHECK(_index_file_writer != nullptr);
}
opts.index_file_writer = _index_file_writer;
#define DISABLE_INDEX_IF_FIELD_TYPE(TYPE, type_name) \
if (column.type() == FieldType::OLAP_FIELD_TYPE_##TYPE) { \
opts.need_zone_map = false; \
opts.need_bloom_filter = false; \
}
DISABLE_INDEX_IF_FIELD_TYPE(STRUCT, "struct")
DISABLE_INDEX_IF_FIELD_TYPE(ARRAY, "array")
DISABLE_INDEX_IF_FIELD_TYPE(JSONB, "jsonb")
DISABLE_INDEX_IF_FIELD_TYPE(AGG_STATE, "agg_state")
DISABLE_INDEX_IF_FIELD_TYPE(MAP, "map")
DISABLE_INDEX_IF_FIELD_TYPE(BITMAP, "object")
DISABLE_INDEX_IF_FIELD_TYPE(HLL, "hll")
DISABLE_INDEX_IF_FIELD_TYPE(QUANTILE_STATE, "quantile_state")
DISABLE_INDEX_IF_FIELD_TYPE(VARIANT, "variant")
#undef DISABLE_INDEX_IF_FIELD_TYPE
int64_t storage_page_size = _tablet_schema->storage_page_size();
// storage_page_size must be between 4KB and 10MB.
if (storage_page_size >= 4096 && storage_page_size <= 10485760) {
opts.data_page_size = storage_page_size;
}
opts.dict_page_size = _tablet_schema->storage_dict_page_size();
DBUG_EXECUTE_IF("VerticalSegmentWriter._create_column_writer.storage_page_size", {
auto table_id = DebugPoints::instance()->get_debug_param_or_default<int64_t>(
"VerticalSegmentWriter._create_column_writer.storage_page_size", "table_id",
INT_MIN);
auto target_data_page_size = DebugPoints::instance()->get_debug_param_or_default<int64_t>(
"VerticalSegmentWriter._create_column_writer.storage_page_size",
"storage_page_size", INT_MIN);
if (table_id == INT_MIN || target_data_page_size == INT_MIN) {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
"Debug point parameters missing: either 'table_id' or 'storage_page_size' not "
"set.");
}
if (table_id == _tablet_schema->table_id() &&
opts.data_page_size != target_data_page_size) {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
"Mismatch in 'storage_page_size': expected size does not match the current "
"data page size. "
"Expected: " +
std::to_string(target_data_page_size) +
", Actual: " + std::to_string(opts.data_page_size) + ".");
}
})
if (column.is_row_store_column()) {
// smaller page size for row store column
auto page_size = _tablet_schema->row_store_page_size();
opts.data_page_size =
(page_size > 0) ? page_size : segment_v2::ROW_STORE_PAGE_SIZE_DEFAULT_VALUE;
}
opts.rowset_ctx = _opts.rowset_ctx;
opts.file_writer = _file_writer;
opts.compression_type = _opts.compression_type;
opts.footer = &_footer;
if (_opts.rowset_ctx != nullptr) {
opts.input_rs_readers = _opts.rowset_ctx->input_rs_readers;
}
opts.encoding_preference = {.integer_type_default_use_plain_encoding =
_tablet_schema->integer_type_default_use_plain_encoding(),
.binary_plain_encoding_default_impl =
_tablet_schema->binary_plain_encoding_default_impl()};
std::unique_ptr<ColumnWriter> writer;
RETURN_IF_ERROR(ColumnWriter::create(opts, &column, _file_writer, &writer));
RETURN_IF_ERROR(writer->init());
_column_writers.push_back(std::move(writer));
_olap_data_convertor->add_column_data_convertor(column);
return Status::OK();
}
Status SegmentWriter::init(const std::vector<uint32_t>& col_ids, bool has_key) {
DCHECK(_column_writers.empty());
DCHECK(_column_ids.empty());
_has_key = has_key;
_column_writers.reserve(_tablet_schema->columns().size());
_column_ids.insert(_column_ids.end(), col_ids.begin(), col_ids.end());
_olap_data_convertor = std::make_unique<vectorized::OlapBlockDataConvertor>();
if (_opts.compression_type == UNKNOWN_COMPRESSION) {
_opts.compression_type = _tablet_schema->compression_type();
}
RETURN_IF_ERROR(_create_writers(_tablet_schema, col_ids));
// Initialize variant statistics calculator
_variant_stats_calculator =
std::make_unique<VariantStatsCaculator>(&_footer, _tablet_schema, col_ids);
// we don't need the short key index for unique key merge on write table.
if (_has_key) {
if (_is_mow()) {
size_t seq_col_length = 0;
if (_tablet_schema->has_sequence_col()) {
seq_col_length =
_tablet_schema->column(_tablet_schema->sequence_col_idx()).length() + 1;
}
size_t rowid_length = 0;
if (_is_mow_with_cluster_key()) {
rowid_length = PrimaryKeyIndexReader::ROW_ID_LENGTH;
_short_key_index_builder.reset(
new ShortKeyIndexBuilder(_segment_id, _opts.num_rows_per_block));
}
_primary_key_index_builder.reset(
new PrimaryKeyIndexBuilder(_file_writer, seq_col_length, rowid_length));
RETURN_IF_ERROR(_primary_key_index_builder->init());
} else {
_short_key_index_builder.reset(
new ShortKeyIndexBuilder(_segment_id, _opts.num_rows_per_block));
}
}
return Status::OK();
}
Status SegmentWriter::_create_writers(const TabletSchemaSPtr& tablet_schema,
const std::vector<uint32_t>& col_ids) {
_olap_data_convertor->reserve(col_ids.size());
for (auto& cid : col_ids) {
RETURN_IF_ERROR(_create_column_writer(cid, tablet_schema->column(cid), tablet_schema));
}
return Status::OK();
}
void SegmentWriter::_maybe_invalid_row_cache(const std::string& key) {
// Just invalid row cache for simplicity, since the rowset is not visible at present.
// If we update/insert cache, if load failed rowset will not be visible but cached data
// will be visible, and lead to inconsistency.
if (!config::disable_storage_row_cache && _tablet_schema->has_row_store_for_all_columns() &&
_opts.write_type == DataWriteType::TYPE_DIRECT) {
// invalidate cache
RowCache::instance()->erase({_opts.rowset_ctx->tablet_id, key});
}
}
void SegmentWriter::_serialize_block_to_row_column(const vectorized::Block& block) {
if (block.rows() == 0) {
return;
}
MonotonicStopWatch watch;
watch.start();
int row_column_id = 0;
for (int i = 0; i < _tablet_schema->num_columns(); ++i) {
if (_tablet_schema->column(i).is_row_store_column()) {
auto* row_store_column = static_cast<vectorized::ColumnString*>(
block.get_by_position(i).column->assume_mutable_ref().assume_mutable().get());
row_store_column->clear();
vectorized::DataTypeSerDeSPtrs serdes =
vectorized::create_data_type_serdes(block.get_data_types());
vectorized::JsonbSerializeUtil::block_to_jsonb(
*_tablet_schema, block, *row_store_column,
cast_set<int>(_tablet_schema->num_columns()), serdes,
{_tablet_schema->row_columns_uids().begin(),
_tablet_schema->row_columns_uids().end()});
break;
}
}
VLOG_DEBUG << "serialize , num_rows:" << block.rows() << ", row_column_id:" << row_column_id
<< ", total_byte_size:" << block.allocated_bytes() << ", serialize_cost(us)"
<< watch.elapsed_time() / 1000;
}
Status SegmentWriter::probe_key_for_mow(
std::string key, std::size_t segment_pos, bool have_input_seq_column, bool have_delete_sign,
const std::vector<RowsetSharedPtr>& specified_rowsets,
std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches,
bool& has_default_or_nullable, std::vector<bool>& use_default_or_null_flag,
const std::function<void(const RowLocation& loc)>& found_cb,
const std::function<Status()>& not_found_cb, PartialUpdateStats& stats) {
RowLocation loc;
// save rowset shared ptr so this rowset wouldn't delete
RowsetSharedPtr rowset;
auto st = _tablet->lookup_row_key(
key, _tablet_schema.get(), have_input_seq_column, specified_rowsets, &loc,
cast_set<uint32_t>(_mow_context->max_version), segment_caches, &rowset);
if (st.is<KEY_NOT_FOUND>()) {
if (!have_delete_sign) {
RETURN_IF_ERROR(not_found_cb());
}
++stats.num_rows_new_added;
has_default_or_nullable = true;
use_default_or_null_flag.emplace_back(true);
return Status::OK();
}
if (!st.ok() && !st.is<KEY_ALREADY_EXISTS>()) {
LOG(WARNING) << "failed to lookup row key, error: " << st;
return st;
}
// 1. if the delete sign is marked, it means that the value columns of the row will not
// be read. So we don't need to read the missing values from the previous rows.
// 2. the one exception is when there are sequence columns in the table, we need to read
// the sequence columns, otherwise it may cause the merge-on-read based compaction
// policy to produce incorrect results
// TODO(bobhan1): only read seq col rather than all columns in this situation for
// partial update and flexible partial update
// TODO(bobhan1): handle sequence column here
if (st.is<KEY_ALREADY_EXISTS>() || (have_delete_sign && !_tablet_schema->has_sequence_col())) {
has_default_or_nullable = true;
use_default_or_null_flag.emplace_back(true);
} else {
// partial update should not contain invisible columns
use_default_or_null_flag.emplace_back(false);
_rsid_to_rowset.emplace(rowset->rowset_id(), rowset);
found_cb(loc);
}
if (st.is<KEY_ALREADY_EXISTS>()) {
// although we need to mark delete current row, we still need to read missing columns
// for this row, we need to ensure that each column is aligned
_mow_context->delete_bitmap->add(
{_opts.rowset_ctx->rowset_id, _segment_id, DeleteBitmap::TEMP_VERSION_COMMON},
cast_set<uint32_t>(segment_pos));
++stats.num_rows_deleted;
} else {
_mow_context->delete_bitmap->add(
{loc.rowset_id, loc.segment_id, DeleteBitmap::TEMP_VERSION_COMMON}, loc.row_id);
++stats.num_rows_updated;
}
return Status::OK();
}
Status SegmentWriter::partial_update_preconditions_check(size_t row_pos) {
if (!_is_mow()) {
auto msg = fmt::format(
"Can only do partial update on merge-on-write unique table, but found: "
"keys_type={}, _opts.enable_unique_key_merge_on_write={}, tablet_id={}",
_tablet_schema->keys_type(), _opts.enable_unique_key_merge_on_write,
_tablet->tablet_id());
DCHECK(false) << msg;
return Status::InternalError<false>(msg);
}
if (_opts.rowset_ctx->partial_update_info == nullptr) {
auto msg =
fmt::format("partial_update_info should not be nullptr, please check, tablet_id={}",
_tablet->tablet_id());
DCHECK(false) << msg;
return Status::InternalError<false>(msg);
}
if (!_opts.rowset_ctx->partial_update_info->is_fixed_partial_update()) {
auto msg = fmt::format(
"in fixed partial update code, but update_mode={}, please check, tablet_id={}",
_opts.rowset_ctx->partial_update_info->update_mode(), _tablet->tablet_id());
DCHECK(false) << msg;
return Status::InternalError<false>(msg);
}
if (row_pos != 0) {
auto msg = fmt::format("row_pos should be 0, but found {}, tablet_id={}", row_pos,
_tablet->tablet_id());
DCHECK(false) << msg;
return Status::InternalError<false>(msg);
}
return Status::OK();
}
// for partial update, we should do following steps to fill content of block:
// 1. set block data to data convertor, and get all key_column's converted slice
// 2. get pk of input block, and read missing columns
// 2.1 first find key location{rowset_id, segment_id, row_id}
// 2.2 build read plan to read by batch
// 2.3 fill block
// 3. set columns to data convertor and then write all columns
Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* block,
size_t row_pos, size_t num_rows) {
if (block->columns() < _tablet_schema->num_key_columns() ||
block->columns() >= _tablet_schema->num_columns()) {
return Status::InvalidArgument(
fmt::format("illegal partial update block columns: {}, num key columns: {}, total "
"schema columns: {}",
block->columns(), _tablet_schema->num_key_columns(),
_tablet_schema->num_columns()));
}
RETURN_IF_ERROR(partial_update_preconditions_check(row_pos));
// find missing column cids
const auto& missing_cids = _opts.rowset_ctx->partial_update_info->missing_cids;
const auto& including_cids = _opts.rowset_ctx->partial_update_info->update_cids;
// create full block and fill with input columns
auto full_block = _tablet_schema->create_block();
size_t input_id = 0;
for (auto i : including_cids) {
full_block.replace_by_position(i, block->get_by_position(input_id++).column);
}
if (_opts.rowset_ctx->write_type != DataWriteType::TYPE_COMPACTION &&
_tablet_schema->num_variant_columns() > 0) {
RETURN_IF_ERROR(vectorized::variant_util::parse_and_materialize_variant_columns(
full_block, *_tablet_schema, including_cids));
}
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_columns(
&full_block, row_pos, num_rows, including_cids));
bool have_input_seq_column = false;
// write including columns
std::vector<vectorized::IOlapColumnDataAccessor*> key_columns;
vectorized::IOlapColumnDataAccessor* seq_column = nullptr;
size_t segment_start_pos = 0;
for (auto cid : including_cids) {
// here we get segment column row num before append data.
segment_start_pos = _column_writers[cid]->get_next_rowid();
// olap data convertor alway start from id = 0
auto converted_result = _olap_data_convertor->convert_column_data(cid);
if (!converted_result.first.ok()) {
return converted_result.first;
}
if (cid < _num_sort_key_columns) {
key_columns.push_back(converted_result.second);
} else if (_tablet_schema->has_sequence_col() &&
cid == _tablet_schema->sequence_col_idx()) {
seq_column = converted_result.second;
have_input_seq_column = true;
}
RETURN_IF_ERROR(_column_writers[cid]->append(converted_result.second->get_nullmap(),
converted_result.second->get_data(),
num_rows));
}
bool has_default_or_nullable = false;
std::vector<bool> use_default_or_null_flag;
use_default_or_null_flag.reserve(num_rows);
const auto* delete_signs =
BaseTablet::get_delete_sign_column_data(full_block, row_pos + num_rows);
const std::vector<RowsetSharedPtr>& specified_rowsets = _mow_context->rowset_ptrs;
std::vector<std::unique_ptr<SegmentCacheHandle>> segment_caches(specified_rowsets.size());
FixedReadPlan read_plan;
// locate rows in base data
PartialUpdateStats stats;
for (size_t block_pos = row_pos; block_pos < row_pos + num_rows; block_pos++) {
// block segment
// 2 -> 0
// 3 -> 1
// 4 -> 2
// 5 -> 3
// here row_pos = 2, num_rows = 4.
size_t delta_pos = block_pos - row_pos;
size_t segment_pos = segment_start_pos + delta_pos;
std::string key = _full_encode_keys(key_columns, delta_pos);
_maybe_invalid_row_cache(key);
if (have_input_seq_column) {
_encode_seq_column(seq_column, delta_pos, &key);
}
// If the table have sequence column, and the include-cids don't contain the sequence
// column, we need to update the primary key index builder at the end of this method.
// At that time, we have a valid sequence column to encode the key with seq col.
if (!_tablet_schema->has_sequence_col() || have_input_seq_column) {
RETURN_IF_ERROR(_primary_key_index_builder->add_item(key));
}
// mark key with delete sign as deleted.
bool have_delete_sign = (delete_signs != nullptr && delete_signs[block_pos] != 0);
auto not_found_cb = [&]() {
return _opts.rowset_ctx->partial_update_info->handle_new_key(
*_tablet_schema, [&]() -> std::string {
return block->dump_one_line(block_pos,
cast_set<int>(_num_sort_key_columns));
});
};
auto update_read_plan = [&](const RowLocation& loc) {
read_plan.prepare_to_read(loc, segment_pos);
};
RETURN_IF_ERROR(probe_key_for_mow(std::move(key), segment_pos, have_input_seq_column,
have_delete_sign, specified_rowsets, segment_caches,
has_default_or_nullable, use_default_or_null_flag,
update_read_plan, not_found_cb, stats));
}
CHECK_EQ(use_default_or_null_flag.size(), num_rows);
if (config::enable_merge_on_write_correctness_check) {
_tablet->add_sentinel_mark_to_delete_bitmap(_mow_context->delete_bitmap.get(),
*_mow_context->rowset_ids);
}
// read to fill full block
RETURN_IF_ERROR(read_plan.fill_missing_columns(
_opts.rowset_ctx, _rsid_to_rowset, *_tablet_schema, full_block,
use_default_or_null_flag, has_default_or_nullable,
cast_set<uint32_t>(segment_start_pos), block));
// convert block to row store format
_serialize_block_to_row_column(full_block);
// convert missing columns and send to column writer
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_columns(
&full_block, row_pos, num_rows, missing_cids));
for (auto cid : missing_cids) {
auto converted_result = _olap_data_convertor->convert_column_data(cid);
if (!converted_result.first.ok()) {
return converted_result.first;
}
if (_tablet_schema->has_sequence_col() && !have_input_seq_column &&
cid == _tablet_schema->sequence_col_idx()) {
DCHECK_EQ(seq_column, nullptr);
seq_column = converted_result.second;
}
RETURN_IF_ERROR(_column_writers[cid]->append(converted_result.second->get_nullmap(),
converted_result.second->get_data(),
num_rows));
}
_num_rows_updated += stats.num_rows_updated;
_num_rows_deleted += stats.num_rows_deleted;
_num_rows_new_added += stats.num_rows_new_added;
_num_rows_filtered += stats.num_rows_filtered;
if (_tablet_schema->has_sequence_col() && !have_input_seq_column) {
DCHECK_NE(seq_column, nullptr);
if (_num_rows_written != row_pos ||
_primary_key_index_builder->num_rows() != _num_rows_written) {
return Status::InternalError(
"Correctness check failed, _num_rows_written: {}, row_pos: {}, primary key "
"index builder num rows: {}",
_num_rows_written, row_pos, _primary_key_index_builder->num_rows());
}
RETURN_IF_ERROR(
_generate_primary_key_index(_key_coders, key_columns, seq_column, num_rows, false));
}
_num_rows_written += num_rows;
DCHECK_EQ(_primary_key_index_builder->num_rows(), _num_rows_written)
<< "primary key index builder num rows(" << _primary_key_index_builder->num_rows()
<< ") not equal to segment writer's num rows written(" << _num_rows_written << ")";
_olap_data_convertor->clear_source_content();
return Status::OK();
}
Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_pos,
size_t num_rows) {
if (_opts.rowset_ctx->partial_update_info &&
_opts.rowset_ctx->partial_update_info->is_partial_update() &&
_opts.write_type == DataWriteType::TYPE_DIRECT &&
!_opts.rowset_ctx->is_transient_rowset_writer) {
if (_opts.rowset_ctx->partial_update_info->is_fixed_partial_update()) {
RETURN_IF_ERROR(append_block_with_partial_content(block, row_pos, num_rows));
} else {
return Status::NotSupported<false>(
"SegmentWriter doesn't support flexible partial update, please set "
"enable_vertical_segment_writer=true in be.conf on all BEs to use "
"VerticalSegmentWriter.");
}
return Status::OK();
}
if (block->columns() < _column_writers.size()) {
return Status::InternalError(
"block->columns() < _column_writers.size(), block->columns()=" +
std::to_string(block->columns()) +
", _column_writers.size()=" + std::to_string(_column_writers.size()) +
", _tablet_schema->dump_structure()=" + _tablet_schema->dump_structure());
}
CHECK(block->columns() >= _column_writers.size())
<< ", block->columns()=" << block->columns()
<< ", _column_writers.size()=" << _column_writers.size()
<< ", _tablet_schema->dump_structure()=" << _tablet_schema->dump_structure();
// Row column should be filled here when it's a directly write from memtable
// or it's schema change write(since column data type maybe changed, so we should reubild)
if (_opts.write_type == DataWriteType::TYPE_DIRECT ||
_opts.write_type == DataWriteType::TYPE_SCHEMA_CHANGE) {
_serialize_block_to_row_column(*block);
}
if (_opts.rowset_ctx->write_type != DataWriteType::TYPE_COMPACTION &&
_tablet_schema->num_variant_columns() > 0) {
RETURN_IF_ERROR(vectorized::variant_util::parse_and_materialize_variant_columns(
const_cast<vectorized::Block&>(*block), *_tablet_schema, _column_ids));
}
_olap_data_convertor->set_source_content(block, row_pos, num_rows);
// find all row pos for short key indexes
std::vector<size_t> short_key_pos;
if (_has_key) {
// We build a short key index every `_opts.num_rows_per_block` rows. Specifically, we
// build a short key index using 1st rows for first block and `_short_key_row_pos - _row_count`
// for next blocks.
// Ensure we build a short key index using 1st rows only for the first block (ISSUE-9766).
if (UNLIKELY(_short_key_row_pos == 0 && _num_rows_written == 0)) {
short_key_pos.push_back(0);
}
while (_short_key_row_pos + _opts.num_rows_per_block < _num_rows_written + num_rows) {
_short_key_row_pos += _opts.num_rows_per_block;
short_key_pos.push_back(_short_key_row_pos - _num_rows_written);
}
}
// convert column data from engine format to storage layer format
std::vector<vectorized::IOlapColumnDataAccessor*> key_columns;
vectorized::IOlapColumnDataAccessor* seq_column = nullptr;
for (size_t id = 0; id < _column_writers.size(); ++id) {
// olap data convertor alway start from id = 0
auto converted_result = _olap_data_convertor->convert_column_data(id);
if (!converted_result.first.ok()) {
return converted_result.first;
}
auto cid = _column_ids[id];
if (_has_key && cid < _tablet_schema->num_key_columns()) {
key_columns.push_back(converted_result.second);
} else if (_has_key && _tablet_schema->has_sequence_col() &&
cid == _tablet_schema->sequence_col_idx()) {
seq_column = converted_result.second;
}
RETURN_IF_ERROR(_column_writers[id]->append(converted_result.second->get_nullmap(),
converted_result.second->get_data(), num_rows));
}
if (_opts.write_type == DataWriteType::TYPE_COMPACTION) {
RETURN_IF_ERROR(
_variant_stats_calculator->calculate_variant_stats(block, row_pos, num_rows));
}
if (_has_key) {
if (_is_mow_with_cluster_key()) {
// for now we don't need to query short key index for CLUSTER BY feature,
// but we still write the index for future usage.
// 1. generate primary key index, the key_columns is primary_key_columns
RETURN_IF_ERROR(_generate_primary_key_index(_primary_key_coders, key_columns,
seq_column, num_rows, true));
// 2. generate short key index (use cluster key)
key_columns.clear();
for (const auto& cid : _tablet_schema->cluster_key_uids()) {
// find cluster key index in tablet schema
auto cluster_key_index = _tablet_schema->field_index(cid);
if (cluster_key_index == -1) {
return Status::InternalError(
"could not find cluster key column with unique_id=" +
std::to_string(cid) + " in tablet schema");
}
bool found = false;
for (auto i = 0; i < _column_ids.size(); ++i) {
if (_column_ids[i] == cluster_key_index) {
auto converted_result = _olap_data_convertor->convert_column_data(i);
if (!converted_result.first.ok()) {
return converted_result.first;
}
key_columns.push_back(converted_result.second);
found = true;
break;
}
}
if (!found) {
return Status::InternalError(
"could not found cluster key column with unique_id=" +
std::to_string(cid) +
", tablet schema index=" + std::to_string(cluster_key_index));
}
}
RETURN_IF_ERROR(_generate_short_key_index(key_columns, num_rows, short_key_pos));
} else if (_is_mow()) {
RETURN_IF_ERROR(_generate_primary_key_index(_key_coders, key_columns, seq_column,
num_rows, false));
} else {
RETURN_IF_ERROR(_generate_short_key_index(key_columns, num_rows, short_key_pos));
}
}
_num_rows_written += num_rows;
_olap_data_convertor->clear_source_content();
return Status::OK();
}
int64_t SegmentWriter::max_row_to_add(size_t row_avg_size_in_bytes) {
auto segment_size = estimate_segment_size();
if (segment_size >= MAX_SEGMENT_SIZE || _num_rows_written >= _opts.max_rows_per_segment)
[[unlikely]] {
return 0;
}
int64_t size_rows = ((int64_t)MAX_SEGMENT_SIZE - (int64_t)segment_size) / row_avg_size_in_bytes;
int64_t count_rows = (int64_t)_opts.max_rows_per_segment - _num_rows_written;
return std::min(size_rows, count_rows);
}
std::string SegmentWriter::_full_encode_keys(
const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns, size_t pos,
bool null_first) {
assert(_key_index_size.size() == _num_sort_key_columns);
assert(key_columns.size() == _num_sort_key_columns &&
_key_coders.size() == _num_sort_key_columns);
return _full_encode_keys(_key_coders, key_columns, pos, null_first);
}
std::string SegmentWriter::_full_encode_keys(
const std::vector<const KeyCoder*>& key_coders,
const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns, size_t pos,
bool null_first) {
assert(key_columns.size() == key_coders.size());
std::string encoded_keys;
size_t cid = 0;
for (const auto& column : key_columns) {
auto field = column->get_data_at(pos);
if (UNLIKELY(!field)) {
if (null_first) {
encoded_keys.push_back(KEY_NULL_FIRST_MARKER);
} else {
encoded_keys.push_back(KEY_NORMAL_MARKER);
}
++cid;
continue;
}
encoded_keys.push_back(KEY_NORMAL_MARKER);
DCHECK(key_coders[cid] != nullptr);
key_coders[cid]->full_encode_ascending(field, &encoded_keys);
++cid;
}
return encoded_keys;
}
void SegmentWriter::_encode_seq_column(const vectorized::IOlapColumnDataAccessor* seq_column,
size_t pos, std::string* encoded_keys) {
auto field = seq_column->get_data_at(pos);
// To facilitate the use of the primary key index, encode the seq column
// to the minimum value of the corresponding length when the seq column
// is null
if (UNLIKELY(!field)) {
encoded_keys->push_back(KEY_NULL_FIRST_MARKER);
size_t seq_col_length = _tablet_schema->column(_tablet_schema->sequence_col_idx()).length();
encoded_keys->append(seq_col_length, KEY_MINIMAL_MARKER);
return;
}
encoded_keys->push_back(KEY_NORMAL_MARKER);
_seq_coder->full_encode_ascending(field, encoded_keys);
}
void SegmentWriter::_encode_rowid(const uint32_t rowid, std::string* encoded_keys) {
encoded_keys->push_back(KEY_NORMAL_MARKER);
_rowid_coder->full_encode_ascending(&rowid, encoded_keys);
}
std::string SegmentWriter::_encode_keys(
const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns, size_t pos) {
assert(key_columns.size() == _num_short_key_columns);
std::string encoded_keys;
size_t cid = 0;
for (const auto& column : key_columns) {
auto field = column->get_data_at(pos);
if (UNLIKELY(!field)) {
encoded_keys.push_back(KEY_NULL_FIRST_MARKER);
++cid;
continue;
}
encoded_keys.push_back(KEY_NORMAL_MARKER);
_key_coders[cid]->encode_ascending(field, _key_index_size[cid], &encoded_keys);
++cid;
}
return encoded_keys;
}
template <typename RowType>
Status SegmentWriter::append_row(const RowType& row) {
for (size_t cid = 0; cid < _column_writers.size(); ++cid) {
auto cell = row.cell(cast_set<uint32_t>(cid));
RETURN_IF_ERROR(_column_writers[cid]->append(cell));
}
std::string full_encoded_key;
encode_key<RowType, true>(&full_encoded_key, row, _num_sort_key_columns);
if (_tablet_schema->has_sequence_col()) {
full_encoded_key.push_back(KEY_NORMAL_MARKER);
auto cid = _tablet_schema->sequence_col_idx();
auto cell = row.cell(cid);
row.schema()->column(cid)->full_encode_ascending(cell.cell_ptr(), &full_encoded_key);
}
if (_is_mow_with_cluster_key()) {
return Status::InternalError(
"SegmentWriter::append_row does not support mow tables with cluster key");
} else if (_is_mow()) {
RETURN_IF_ERROR(_primary_key_index_builder->add_item(full_encoded_key));
} else {
// At the beginning of one block, so add a short key index entry
if ((_num_rows_written % _opts.num_rows_per_block) == 0) {
std::string encoded_key;
encode_key(&encoded_key, row, _num_short_key_columns);
RETURN_IF_ERROR(_short_key_index_builder->add_item(encoded_key));
}
set_min_max_key(full_encoded_key);
}
++_num_rows_written;
return Status::OK();
}
template Status SegmentWriter::append_row(const RowCursor& row);
// TODO(lingbin): Currently this function does not include the size of various indexes,
// We should make this more precise.
// NOTE: This function will be called when any row of data is added, so we need to
// make this function efficient.
uint64_t SegmentWriter::estimate_segment_size() {
// footer_size(4) + checksum(4) + segment_magic(4)
uint64_t size = 12;
for (auto& column_writer : _column_writers) {
size += column_writer->estimate_buffer_size();
}
if (_is_mow_with_cluster_key()) {
size += _primary_key_index_builder->size() + _short_key_index_builder->size();
} else if (_is_mow()) {
size += _primary_key_index_builder->size();
} else {
size += _short_key_index_builder->size();
}
// update the mem_tracker of segment size
_mem_tracker->consume(size - _mem_tracker->consumption());
return size;
}
Status SegmentWriter::finalize_columns_data() {
if (_has_key) {
_row_count = _num_rows_written;
} else {
DCHECK(_row_count == _num_rows_written)
<< "_row_count != _num_rows_written:" << _row_count << " vs. " << _num_rows_written;
if (_row_count != _num_rows_written) {
std::stringstream ss;
ss << "_row_count != _num_rows_written:" << _row_count << " vs. " << _num_rows_written;
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
}
_num_rows_written = 0;
for (auto& column_writer : _column_writers) {
RETURN_IF_ERROR(column_writer->finish());
}
RETURN_IF_ERROR(_write_data());
return Status::OK();
}
Status SegmentWriter::finalize_columns_index(uint64_t* index_size) {
uint64_t index_start = _file_writer->bytes_appended();
RETURN_IF_ERROR(_write_ordinal_index());
RETURN_IF_ERROR(_write_zone_map());
RETURN_IF_ERROR(_write_inverted_index());
RETURN_IF_ERROR(_write_ann_index());
RETURN_IF_ERROR(_write_bloom_filter_index());
*index_size = _file_writer->bytes_appended() - index_start;
if (_has_key) {
if (_is_mow_with_cluster_key()) {
// 1. sort primary keys
std::sort(_primary_keys.begin(), _primary_keys.end());
// 2. write primary keys index
std::string last_key;
for (const auto& key : _primary_keys) {
DCHECK(key.compare(last_key) > 0)
<< "found duplicate key or key is not sorted! current key: " << key
<< ", last key: " << last_key;
RETURN_IF_ERROR(_primary_key_index_builder->add_item(key));
last_key = key;
}
RETURN_IF_ERROR(_write_short_key_index());
*index_size = _file_writer->bytes_appended() - index_start;
RETURN_IF_ERROR(_write_primary_key_index());
*index_size += _primary_key_index_builder->disk_size();
} else if (_is_mow()) {
RETURN_IF_ERROR(_write_primary_key_index());
// IndexedColumnWriter write data pages mixed with segment data, we should use
// the stat from primary key index builder.
*index_size += _primary_key_index_builder->disk_size();
} else {
RETURN_IF_ERROR(_write_short_key_index());
*index_size = _file_writer->bytes_appended() - index_start;
}
}
// reset all column writers and data_conveter
clear();
return Status::OK();
}
Status SegmentWriter::finalize_footer(uint64_t* segment_file_size) {
RETURN_IF_ERROR(_write_footer());
// finish
RETURN_IF_ERROR(_file_writer->close(true));
*segment_file_size = _file_writer->bytes_appended();
if (*segment_file_size == 0) {
return Status::Corruption("Bad segment, file size = 0");
}
return Status::OK();
}
Status SegmentWriter::finalize(uint64_t* segment_file_size, uint64_t* index_size) {
MonotonicStopWatch timer;
timer.start();
// check disk capacity
if (_data_dir != nullptr && _data_dir->reach_capacity_limit((int64_t)estimate_segment_size())) {
return Status::Error<DISK_REACH_CAPACITY_LIMIT>("disk {} exceed capacity limit, path: {}",
_data_dir->path_hash(), _data_dir->path());
}
// write data
RETURN_IF_ERROR(finalize_columns_data());
// Get the index start before finalize_footer since this function would write new data.
uint64_t index_start = _file_writer->bytes_appended();
// write index
RETURN_IF_ERROR(finalize_columns_index(index_size));
// write footer
RETURN_IF_ERROR(finalize_footer(segment_file_size));
if (timer.elapsed_time() > 5000000000l) {
LOG(INFO) << "segment flush consumes a lot time_ns " << timer.elapsed_time()
<< ", segmemt_size " << *segment_file_size;
}
// When the cache type is not ttl(expiration time == 0), the data should be split into normal cache queue
// and index cache queue
if (auto* cache_builder = _file_writer->cache_builder(); cache_builder != nullptr &&
cache_builder->_expiration_time == 0 &&
config::is_cloud_mode()) {
auto size = *index_size + *segment_file_size;
auto holder = cache_builder->allocate_cache_holder(index_start, size, _tablet->tablet_id());
for (auto& segment : holder->file_blocks) {
static_cast<void>(segment->change_cache_type(io::FileCacheType::INDEX));
}
}
return Status::OK();
}
void SegmentWriter::clear() {
for (auto& column_writer : _column_writers) {
column_writer.reset();
}
_column_writers.clear();
_column_ids.clear();
_olap_data_convertor.reset();
}
// write column data to file one by one
Status SegmentWriter::_write_data() {
for (auto& column_writer : _column_writers) {
RETURN_IF_ERROR(column_writer->write_data());
auto* column_meta = column_writer->get_column_meta();
DCHECK(column_meta != nullptr);
column_meta->set_compressed_data_bytes(
(column_meta->has_compressed_data_bytes() ? column_meta->compressed_data_bytes()
: 0) +
column_writer->get_total_compressed_data_pages_bytes());
column_meta->set_uncompressed_data_bytes(
(column_meta->has_uncompressed_data_bytes() ? column_meta->uncompressed_data_bytes()
: 0) +
column_writer->get_total_uncompressed_data_pages_bytes());
column_meta->set_raw_data_bytes(
(column_meta->has_raw_data_bytes() ? column_meta->raw_data_bytes() : 0) +
column_writer->get_raw_data_bytes());
}
return Status::OK();
}
// write ordinal index after data has been written
Status SegmentWriter::_write_ordinal_index() {
for (auto& column_writer : _column_writers) {
RETURN_IF_ERROR(column_writer->write_ordinal_index());
}
return Status::OK();
}
Status SegmentWriter::_write_zone_map() {
for (auto& column_writer : _column_writers) {
RETURN_IF_ERROR(column_writer->write_zone_map());
}
return Status::OK();
}
Status SegmentWriter::_write_inverted_index() {
for (auto& column_writer : _column_writers) {
RETURN_IF_ERROR(column_writer->write_inverted_index());
}
return Status::OK();
}
Status SegmentWriter::_write_ann_index() {
for (auto& column_writer : _column_writers) {
RETURN_IF_ERROR(column_writer->write_ann_index());
}
return Status::OK();
}
Status SegmentWriter::_write_bloom_filter_index() {
for (auto& column_writer : _column_writers) {
RETURN_IF_ERROR(column_writer->write_bloom_filter_index());
}
return Status::OK();
}
Status SegmentWriter::_write_short_key_index() {
std::vector<Slice> body;
PageFooterPB footer;
RETURN_IF_ERROR(_short_key_index_builder->finalize(_row_count, &body, &footer));
PagePointer pp;
// short key index page is not compressed right now
RETURN_IF_ERROR(PageIO::write_page(_file_writer, body, footer, &pp));
pp.to_proto(_footer.mutable_short_key_index_page());
return Status::OK();
}
Status SegmentWriter::_write_primary_key_index() {
CHECK_EQ(_primary_key_index_builder->num_rows(), _row_count);
return _primary_key_index_builder->finalize(_footer.mutable_primary_key_index_meta());
}
Status SegmentWriter::_write_footer() {
_footer.set_num_rows(_row_count);
// Decide whether to externalize ColumnMetaPB by tablet default, and stamp footer version
if (_tablet_schema->is_external_segment_column_meta_used()) {
_footer.set_version(SEGMENT_FOOTER_VERSION_V3_EXT_COL_META);
VLOG_DEBUG << "use external column meta";
// External ColumnMetaPB writing (optional)
RETURN_IF_ERROR(ExternalColMetaUtil::write_external_column_meta(
_file_writer, &_footer, _opts.compression_type,
[this](const std::vector<Slice>& slices) { return _write_raw_data(slices); }));
}
// Footer := SegmentFooterPB, FooterPBSize(4), FooterPBChecksum(4), MagicNumber(4)
std::string footer_buf;
VLOG_DEBUG << "footer " << _footer.DebugString();
if (!_footer.SerializeToString(&footer_buf)) {
return Status::InternalError("failed to serialize segment footer");
}
faststring fixed_buf;
// footer's size
put_fixed32_le(&fixed_buf, cast_set<uint32_t>(footer_buf.size()));
// footer's checksum
uint32_t checksum = crc32c::Crc32c(footer_buf.data(), footer_buf.size());
put_fixed32_le(&fixed_buf, checksum);
// Append magic number. we don't write magic number in the header because
// that will need an extra seek when reading
fixed_buf.append(k_segment_magic, k_segment_magic_length);
std::vector<Slice> slices {footer_buf, fixed_buf};
return _write_raw_data(slices);
}
Status SegmentWriter::_write_raw_data(const std::vector<Slice>& slices) {
RETURN_IF_ERROR(_file_writer->appendv(&slices[0], slices.size()));
return Status::OK();
}
Slice SegmentWriter::min_encoded_key() {
return (_primary_key_index_builder == nullptr) ? Slice(_min_key.data(), _min_key.size())
: _primary_key_index_builder->min_key();
}
Slice SegmentWriter::max_encoded_key() {
return (_primary_key_index_builder == nullptr) ? Slice(_max_key.data(), _max_key.size())
: _primary_key_index_builder->max_key();
}
void SegmentWriter::set_min_max_key(const Slice& key) {
if (UNLIKELY(_is_first_row)) {
_min_key.append(key.get_data(), key.get_size());
_is_first_row = false;
}
if (key.compare(_max_key) > 0) {
_max_key.clear();
_max_key.append(key.get_data(), key.get_size());
}
}
void SegmentWriter::set_min_key(const Slice& key) {
if (UNLIKELY(_is_first_row)) {
_min_key.append(key.get_data(), key.get_size());
_is_first_row = false;
}
}
void SegmentWriter::set_max_key(const Slice& key) {
_max_key.clear();
_max_key.append(key.get_data(), key.get_size());
}
void SegmentWriter::set_mow_context(std::shared_ptr<MowContext> mow_context) {
_mow_context = mow_context;
}
Status SegmentWriter::_generate_primary_key_index(
const std::vector<const KeyCoder*>& primary_key_coders,
const std::vector<vectorized::IOlapColumnDataAccessor*>& primary_key_columns,
vectorized::IOlapColumnDataAccessor* seq_column, size_t num_rows, bool need_sort) {
if (!need_sort) { // mow table without cluster key
std::string last_key;
for (size_t pos = 0; pos < num_rows; pos++) {
// use _key_coders
std::string key = _full_encode_keys(primary_key_columns, pos);
_maybe_invalid_row_cache(key);
if (_tablet_schema->has_sequence_col()) {
_encode_seq_column(seq_column, pos, &key);
}
DCHECK(key.compare(last_key) > 0)
<< "found duplicate key or key is not sorted! current key: " << key
<< ", last key: " << last_key;
RETURN_IF_ERROR(_primary_key_index_builder->add_item(key));
last_key = std::move(key);
}
} else { // mow table with cluster key
// generate primary keys in memory
for (uint32_t pos = 0; pos < num_rows; pos++) {
std::string key = _full_encode_keys(primary_key_coders, primary_key_columns, pos);
_maybe_invalid_row_cache(key);
if (_tablet_schema->has_sequence_col()) {
_encode_seq_column(seq_column, pos, &key);
}
_encode_rowid(pos + _num_rows_written, &key);
_primary_keys_size += key.size();
_primary_keys.emplace_back(std::move(key));
}
}
return Status::OK();
}
Status SegmentWriter::_generate_short_key_index(
std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns, size_t num_rows,
const std::vector<size_t>& short_key_pos) {
// use _key_coders
set_min_key(_full_encode_keys(key_columns, 0));
set_max_key(_full_encode_keys(key_columns, num_rows - 1));
DCHECK(Slice(_max_key.data(), _max_key.size())
.compare(Slice(_min_key.data(), _min_key.size())) >= 0)
<< "key is not sorted! min key: " << _min_key << ", max key: " << _max_key;
key_columns.resize(_num_short_key_columns);
std::string last_key;
for (const auto pos : short_key_pos) {
std::string key = _encode_keys(key_columns, pos);
DCHECK(key.compare(last_key) >= 0)
<< "key is not sorted! current key: " << key << ", last key: " << last_key;
RETURN_IF_ERROR(_short_key_index_builder->add_item(key));
last_key = std::move(key);
}
return Status::OK();
}
inline bool SegmentWriter::_is_mow() {
return _tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write;
}
inline bool SegmentWriter::_is_mow_with_cluster_key() {
return _is_mow() && !_tablet_schema->cluster_key_uids().empty();
}
#include "common/compile_check_end.h"
} // namespace segment_v2
} // namespace doris