blob: 5e6082c5c13938a73ac7aebc6bd648e6e7746c67 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap/partial_update_info.h"
#include <gen_cpp/olap_file.pb.h>
#include <cstdint>
#include "common/consts.h"
#include "common/logging.h"
#include "olap/base_tablet.h"
#include "olap/olap_common.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_writer_context.h"
#include "olap/rowset/segment_v2/vertical_segment_writer.h"
#include "olap/tablet_meta.h"
#include "olap/tablet_schema.h"
#include "olap/utils.h"
#include "util/bitmap_value.h"
#include "vec/common/assert_cast.h"
#include "vec/core/block.h"
#include "vec/data_types/data_type_number.h" // IWYU pragma: keep
#include "vec/olap/olap_data_convertor.h"
namespace doris {
#include "common/compile_check_begin.h"
Status PartialUpdateInfo::init(int64_t tablet_id, int64_t txn_id, const TabletSchema& tablet_schema,
UniqueKeyUpdateModePB unique_key_update_mode,
PartialUpdateNewRowPolicyPB policy,
const std::set<std::string>& partial_update_cols,
bool is_strict_mode_, int64_t timestamp_ms_, int32_t nano_seconds_,
const std::string& timezone_,
const std::string& auto_increment_column,
int32_t sequence_map_col_uid, int64_t cur_max_version) {
partial_update_mode = unique_key_update_mode;
partial_update_new_key_policy = policy;
partial_update_input_columns = partial_update_cols;
max_version_in_flush_phase = cur_max_version;
sequence_map_col_unqiue_id = sequence_map_col_uid;
timestamp_ms = timestamp_ms_;
nano_seconds = nano_seconds_;
timezone = timezone_;
missing_cids.clear();
update_cids.clear();
if (partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) {
// partial_update_cols should include all key columns
for (std::size_t i {0}; i < tablet_schema.num_key_columns(); i++) {
const auto key_col = tablet_schema.column(i);
if (!partial_update_cols.contains(key_col.name())) {
auto msg = fmt::format(
"Unable to do partial update on shadow index's tablet, tablet_id={}, "
"txn_id={}. Missing key column {}.",
tablet_id, txn_id, key_col.name());
LOG_WARNING(msg);
return Status::Aborted<false>(msg);
}
}
}
for (auto i = 0; i < tablet_schema.num_columns(); ++i) {
if (partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) {
auto tablet_column = tablet_schema.column(i);
if (!partial_update_input_columns.contains(tablet_column.name())) {
missing_cids.emplace_back(i);
if (!tablet_column.has_default_value() && !tablet_column.is_nullable() &&
tablet_schema.auto_increment_column() != tablet_column.name()) {
can_insert_new_rows_in_partial_update = false;
}
} else {
update_cids.emplace_back(i);
}
if (auto_increment_column == tablet_column.name()) {
is_schema_contains_auto_inc_column = true;
}
} else {
// in flexible partial update, missing cids is all non sort keys' cid
if (i >= tablet_schema.num_key_columns()) {
missing_cids.emplace_back(i);
}
}
}
is_strict_mode = is_strict_mode_;
is_input_columns_contains_auto_inc_column =
is_fixed_partial_update() &&
partial_update_input_columns.contains(auto_increment_column);
_generate_default_values_for_missing_cids(tablet_schema);
return Status::OK();
}
void PartialUpdateInfo::to_pb(PartialUpdateInfoPB* partial_update_info_pb) const {
partial_update_info_pb->set_partial_update_mode(partial_update_mode);
partial_update_info_pb->set_partial_update_new_key_policy(partial_update_new_key_policy);
partial_update_info_pb->set_max_version_in_flush_phase(max_version_in_flush_phase);
for (const auto& col : partial_update_input_columns) {
partial_update_info_pb->add_partial_update_input_columns(col);
}
for (auto cid : missing_cids) {
partial_update_info_pb->add_missing_cids(cid);
}
for (auto cid : update_cids) {
partial_update_info_pb->add_update_cids(cid);
}
partial_update_info_pb->set_can_insert_new_rows_in_partial_update(
can_insert_new_rows_in_partial_update);
partial_update_info_pb->set_is_strict_mode(is_strict_mode);
partial_update_info_pb->set_timestamp_ms(timestamp_ms);
partial_update_info_pb->set_nano_seconds(nano_seconds);
partial_update_info_pb->set_timezone(timezone);
partial_update_info_pb->set_is_input_columns_contains_auto_inc_column(
is_input_columns_contains_auto_inc_column);
partial_update_info_pb->set_is_schema_contains_auto_inc_column(
is_schema_contains_auto_inc_column);
for (const auto& value : default_values) {
partial_update_info_pb->add_default_values(value);
}
}
void PartialUpdateInfo::from_pb(PartialUpdateInfoPB* partial_update_info_pb) {
if (!partial_update_info_pb->has_partial_update_mode()) {
// for backward compatibility
if (partial_update_info_pb->is_partial_update()) {
partial_update_mode = UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS;
} else {
partial_update_mode = UniqueKeyUpdateModePB::UPSERT;
}
} else {
partial_update_mode = partial_update_info_pb->partial_update_mode();
}
if (partial_update_info_pb->has_partial_update_new_key_policy()) {
partial_update_new_key_policy = partial_update_info_pb->partial_update_new_key_policy();
}
max_version_in_flush_phase = partial_update_info_pb->has_max_version_in_flush_phase()
? partial_update_info_pb->max_version_in_flush_phase()
: -1;
partial_update_input_columns.clear();
for (const auto& col : partial_update_info_pb->partial_update_input_columns()) {
partial_update_input_columns.insert(col);
}
missing_cids.clear();
for (auto cid : partial_update_info_pb->missing_cids()) {
missing_cids.push_back(cid);
}
update_cids.clear();
for (auto cid : partial_update_info_pb->update_cids()) {
update_cids.push_back(cid);
}
can_insert_new_rows_in_partial_update =
partial_update_info_pb->can_insert_new_rows_in_partial_update();
is_strict_mode = partial_update_info_pb->is_strict_mode();
timestamp_ms = partial_update_info_pb->timestamp_ms();
timezone = partial_update_info_pb->timezone();
is_input_columns_contains_auto_inc_column =
partial_update_info_pb->is_input_columns_contains_auto_inc_column();
is_schema_contains_auto_inc_column =
partial_update_info_pb->is_schema_contains_auto_inc_column();
if (partial_update_info_pb->has_nano_seconds()) {
nano_seconds = partial_update_info_pb->nano_seconds();
}
default_values.clear();
for (const auto& value : partial_update_info_pb->default_values()) {
default_values.push_back(value);
}
}
std::string PartialUpdateInfo::summary() const {
std::string mode;
switch (partial_update_mode) {
case UniqueKeyUpdateModePB::UPSERT:
mode = "upsert";
break;
case UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS:
mode = "fixed partial update";
break;
case UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS:
mode = "flexible partial update";
break;
}
return fmt::format(
"mode={}, update_cids={}, missing_cids={}, is_strict_mode={}, "
"max_version_in_flush_phase={}",
mode, update_cids.size(), missing_cids.size(), is_strict_mode,
max_version_in_flush_phase);
}
Status PartialUpdateInfo::handle_new_key(const TabletSchema& tablet_schema,
const std::function<std::string()>& line,
BitmapValue* skip_bitmap) {
switch (partial_update_new_key_policy) {
case doris::PartialUpdateNewRowPolicyPB::APPEND: {
if (partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) {
if (!can_insert_new_rows_in_partial_update) {
std::string error_column;
for (auto cid : missing_cids) {
const TabletColumn& col = tablet_schema.column(cid);
if (!col.has_default_value() && !col.is_nullable() &&
!(tablet_schema.auto_increment_column() == col.name())) {
error_column = col.name();
break;
}
}
return Status::Error<ErrorCode::INVALID_SCHEMA, false>(
"the unmentioned column `{}` should have default value or be nullable "
"for newly inserted rows in non-strict mode partial update",
error_column);
}
} else if (partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS) {
DCHECK(skip_bitmap != nullptr);
bool can_insert_new_row {true};
std::string error_column;
for (auto cid : missing_cids) {
const TabletColumn& col = tablet_schema.column(cid);
if (skip_bitmap->contains(col.unique_id()) && !col.has_default_value() &&
!col.is_nullable() && !col.is_auto_increment()) {
error_column = col.name();
can_insert_new_row = false;
break;
}
}
if (!can_insert_new_row) {
return Status::Error<ErrorCode::INVALID_SCHEMA, false>(
"the unmentioned column `{}` should have default value or be "
"nullable for newly inserted rows in non-strict mode flexible partial "
"update",
error_column);
}
}
} break;
case doris::PartialUpdateNewRowPolicyPB::ERROR: {
return Status::Error<ErrorCode::NEW_ROWS_IN_PARTIAL_UPDATE, false>(
"Can't append new rows in partial update when partial_update_new_key_behavior is "
"ERROR. Row with key=[{}] is not in table.",
line());
} break;
}
return Status::OK();
}
void PartialUpdateInfo::_generate_default_values_for_missing_cids(
const TabletSchema& tablet_schema) {
for (unsigned int cur_cid : missing_cids) {
const auto& column = tablet_schema.column(cur_cid);
if (column.has_default_value()) {
std::string default_value;
if (UNLIKELY(column.type() == FieldType::OLAP_FIELD_TYPE_DATETIMEV2 &&
to_lower(column.default_value()).find(to_lower("CURRENT_TIMESTAMP")) !=
std::string::npos)) {
auto pos = to_lower(column.default_value()).find('(');
if (pos == std::string::npos) {
DateV2Value<DateTimeV2ValueType> dtv;
dtv.from_unixtime(timestamp_ms / 1000, timezone);
default_value = dtv.to_string();
} else {
int precision = std::stoi(column.default_value().substr(pos + 1));
DateV2Value<DateTimeV2ValueType> dtv;
dtv.from_unixtime(timestamp_ms / 1000, nano_seconds, timezone, precision);
default_value = dtv.to_string();
}
} else if (UNLIKELY(column.type() == FieldType::OLAP_FIELD_TYPE_DATEV2 &&
to_lower(column.default_value()).find(to_lower("CURRENT_DATE")) !=
std::string::npos)) {
DateV2Value<DateV2ValueType> dv;
dv.from_unixtime(timestamp_ms / 1000, timezone);
default_value = dv.to_string();
} else if (UNLIKELY(column.type() == FieldType::OLAP_FIELD_TYPE_BITMAP &&
to_lower(column.default_value()).find(to_lower("BITMAP_EMPTY")) !=
std::string::npos)) {
BitmapValue v = BitmapValue {};
default_value = v.to_string();
} else {
default_value = column.default_value();
}
default_values.emplace_back(default_value);
} else {
// place an empty string here
default_values.emplace_back();
}
}
CHECK_EQ(missing_cids.size(), default_values.size());
}
bool FixedReadPlan::empty() const {
return plan.empty();
}
void FixedReadPlan::prepare_to_read(const RowLocation& row_location, size_t pos) {
plan[row_location.rowset_id][row_location.segment_id].emplace_back(row_location.row_id, pos);
}
// read columns by read plan
// read_index: ori_pos-> block_idx
Status FixedReadPlan::read_columns_by_plan(
const TabletSchema& tablet_schema, std::vector<uint32_t> cids_to_read,
const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, vectorized::Block& block,
std::map<uint32_t, uint32_t>* read_index, bool force_read_old_delete_signs,
const signed char* __restrict cur_delete_signs) const {
if (force_read_old_delete_signs) {
// always read delete sign column from historical data
if (const vectorized::ColumnWithTypeAndName* old_delete_sign_column =
block.try_get_by_name(DELETE_SIGN);
old_delete_sign_column == nullptr) {
auto del_col_cid = tablet_schema.field_index(DELETE_SIGN);
cids_to_read.emplace_back(del_col_cid);
block.swap(tablet_schema.create_block_by_cids(cids_to_read));
}
}
bool has_row_column = tablet_schema.has_row_store_for_all_columns();
auto mutable_columns = block.mutate_columns();
uint32_t read_idx = 0;
for (const auto& [rowset_id, segment_row_mappings] : plan) {
for (const auto& [segment_id, mappings] : segment_row_mappings) {
auto rowset_iter = rsid_to_rowset.find(rowset_id);
CHECK(rowset_iter != rsid_to_rowset.end());
std::vector<uint32_t> rids;
for (auto [rid, pos] : mappings) {
if (cur_delete_signs && cur_delete_signs[pos]) {
continue;
}
rids.emplace_back(rid);
(*read_index)[static_cast<uint32_t>(pos)] = read_idx++;
}
if (has_row_column) {
auto st = BaseTablet::fetch_value_through_row_column(
rowset_iter->second, tablet_schema, segment_id, rids, cids_to_read, block);
if (!st.ok()) {
LOG(WARNING) << "failed to fetch value through row column";
return st;
}
continue;
}
for (size_t cid = 0; cid < mutable_columns.size(); ++cid) {
TabletColumn tablet_column = tablet_schema.column(cids_to_read[cid]);
auto st = doris::BaseTablet::fetch_value_by_rowids(
rowset_iter->second, segment_id, rids, tablet_column, mutable_columns[cid]);
// set read value to output block
if (!st.ok()) {
LOG(WARNING) << "failed to fetch value";
return st;
}
}
}
}
block.set_columns(std::move(mutable_columns));
return Status::OK();
}
Status FixedReadPlan::fill_missing_columns(
RowsetWriterContext* rowset_ctx, const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
const TabletSchema& tablet_schema, vectorized::Block& full_block,
const std::vector<bool>& use_default_or_null_flag, bool has_default_or_nullable,
uint32_t segment_start_pos, const vectorized::Block* block) const {
auto mutable_full_columns = full_block.mutate_columns();
// create old value columns
const auto& missing_cids = rowset_ctx->partial_update_info->missing_cids;
bool have_input_seq_column = false;
if (tablet_schema.has_sequence_col()) {
const std::vector<uint32_t>& including_cids = rowset_ctx->partial_update_info->update_cids;
have_input_seq_column =
(std::find(including_cids.cbegin(), including_cids.cend(),
tablet_schema.sequence_col_idx()) != including_cids.cend());
}
auto old_value_block = tablet_schema.create_block_by_cids(missing_cids);
CHECK_EQ(missing_cids.size(), old_value_block.columns());
// segment pos to write -> rowid to read in old_value_block
std::map<uint32_t, uint32_t> read_index;
RETURN_IF_ERROR(read_columns_by_plan(tablet_schema, missing_cids, rsid_to_rowset,
old_value_block, &read_index, true, nullptr));
const auto* old_delete_signs = BaseTablet::get_delete_sign_column_data(old_value_block);
DCHECK(old_delete_signs != nullptr);
// build default value columns
auto default_value_block = old_value_block.clone_empty();
RETURN_IF_ERROR(BaseTablet::generate_default_value_block(
tablet_schema, missing_cids, rowset_ctx->partial_update_info->default_values,
old_value_block, default_value_block));
auto mutable_default_value_columns = default_value_block.mutate_columns();
// fill all missing value from mutable_old_columns, need to consider default value and null value
for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) {
auto segment_pos = idx + segment_start_pos;
auto pos_in_old_block = read_index[segment_pos];
for (auto i = 0; i < missing_cids.size(); ++i) {
// if the column has default value, fill it with default value
// otherwise, if the column is nullable, fill it with null value
const auto& tablet_column = tablet_schema.column(missing_cids[i]);
auto& missing_col = mutable_full_columns[missing_cids[i]];
bool should_use_default = use_default_or_null_flag[idx];
if (!should_use_default) {
bool old_row_delete_sign =
(old_delete_signs != nullptr && old_delete_signs[pos_in_old_block] != 0);
if (old_row_delete_sign) {
if (!tablet_schema.has_sequence_col()) {
should_use_default = true;
} else if (have_input_seq_column || (!tablet_column.is_seqeunce_col())) {
// to keep the sequence column value not decreasing, we should read values of seq column
// from old rows even if the old row is deleted when the input don't specify the sequence column, otherwise
// it may cause the merge-on-read based compaction to produce incorrect results
should_use_default = true;
}
}
}
if (should_use_default) {
// clang-format off
if (tablet_column.has_default_value()) {
missing_col->insert_from(*mutable_default_value_columns[i], 0);
} else if (tablet_column.is_nullable()) {
auto* nullable_column = assert_cast<vectorized::ColumnNullable*, TypeCheckOnRelease::DISABLE>(missing_col.get());
nullable_column->insert_many_defaults(1);
} else if (tablet_schema.auto_increment_column() == tablet_column.name()) {
const auto& column = *DORIS_TRY(rowset_ctx->tablet_schema->column(tablet_column.name()));
DCHECK(column.type() == FieldType::OLAP_FIELD_TYPE_BIGINT);
auto* auto_inc_column =
assert_cast<vectorized::ColumnInt64*, TypeCheckOnRelease::DISABLE>(missing_col.get());
auto_inc_column->insert(vectorized::Field::create_field<TYPE_BIGINT>(
assert_cast<const vectorized::ColumnInt64*, TypeCheckOnRelease::DISABLE>(
block->get_by_name(BeConsts::PARTIAL_UPDATE_AUTO_INC_COL).column.get())->get_element(idx)));
} else {
// If the control flow reaches this branch, the column neither has default value
// nor is nullable. It means that the row's delete sign is marked, and the value
// columns are useless and won't be read. So we can just put arbitary values in the cells
missing_col->insert(tablet_column.get_vec_type()->get_default());
}
// clang-format on
} else {
missing_col->insert_from(*old_value_block.get_by_position(i).column,
pos_in_old_block);
}
}
}
full_block.set_columns(std::move(mutable_full_columns));
return Status::OK();
}
void FlexibleReadPlan::prepare_to_read(const RowLocation& row_location, size_t pos,
const BitmapValue& skip_bitmap) {
if (!use_row_store) {
for (uint64_t col_uid : skip_bitmap) {
plan[row_location.rowset_id][row_location.segment_id][static_cast<uint32_t>(col_uid)]
.emplace_back(row_location.row_id, pos);
}
} else {
row_store_plan[row_location.rowset_id][row_location.segment_id].emplace_back(
row_location.row_id, pos);
}
}
Status FlexibleReadPlan::read_columns_by_plan(
const TabletSchema& tablet_schema,
const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
vectorized::Block& old_value_block,
std::map<uint32_t, std::map<uint32_t, uint32_t>>* read_index) const {
auto mutable_columns = old_value_block.mutate_columns();
// cid -> next rid to fill in block
std::map<uint32_t, uint32_t> next_read_idx;
for (uint32_t cid {0}; cid < tablet_schema.num_columns(); cid++) {
next_read_idx[cid] = 0;
}
for (const auto& [rowset_id, segment_mappings] : plan) {
for (const auto& [segment_id, uid_mappings] : segment_mappings) {
for (const auto& [col_uid, mappings] : uid_mappings) {
auto rowset_iter = rsid_to_rowset.find(rowset_id);
CHECK(rowset_iter != rsid_to_rowset.end());
auto cid = tablet_schema.field_index(col_uid);
DCHECK_NE(cid, -1);
DCHECK_GE(cid, tablet_schema.num_key_columns());
std::vector<uint32_t> rids;
for (auto [rid, pos] : mappings) {
rids.emplace_back(rid);
(*read_index)[cid][static_cast<uint32_t>(pos)] = next_read_idx[cid]++;
}
TabletColumn tablet_column = tablet_schema.column(cid);
auto idx = cid - tablet_schema.num_key_columns();
RETURN_IF_ERROR(doris::BaseTablet::fetch_value_by_rowids(
rowset_iter->second, segment_id, rids, tablet_column,
mutable_columns[idx]));
}
}
}
// !!!ATTENTION!!!: columns in block may have different size because every row has different columns to update
old_value_block.set_columns(std::move(mutable_columns));
return Status::OK();
}
Status FlexibleReadPlan::read_columns_by_plan(
const TabletSchema& tablet_schema, const std::vector<uint32_t>& cids_to_read,
const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
vectorized::Block& old_value_block, std::map<uint32_t, uint32_t>* read_index) const {
DCHECK(use_row_store);
uint32_t read_idx = 0;
for (const auto& [rowset_id, segment_row_mappings] : row_store_plan) {
for (const auto& [segment_id, mappings] : segment_row_mappings) {
auto rowset_iter = rsid_to_rowset.find(rowset_id);
CHECK(rowset_iter != rsid_to_rowset.end());
std::vector<uint32_t> rids;
for (auto [rid, pos] : mappings) {
rids.emplace_back(rid);
(*read_index)[static_cast<uint32_t>(pos)] = read_idx++;
}
auto st = BaseTablet::fetch_value_through_row_column(rowset_iter->second, tablet_schema,
segment_id, rids, cids_to_read,
old_value_block);
if (!st.ok()) {
LOG(WARNING) << "failed to fetch value through row column";
return st;
}
}
}
return Status::OK();
}
Status FlexibleReadPlan::fill_non_primary_key_columns(
RowsetWriterContext* rowset_ctx, const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
const TabletSchema& tablet_schema, vectorized::Block& full_block,
const std::vector<bool>& use_default_or_null_flag, bool has_default_or_nullable,
uint32_t segment_start_pos, uint32_t block_start_pos, const vectorized::Block* block,
std::vector<BitmapValue>* skip_bitmaps) const {
auto mutable_full_columns = full_block.mutate_columns();
// missing_cids are all non sort key columns' cids
const auto& non_sort_key_cids = rowset_ctx->partial_update_info->missing_cids;
auto old_value_block = tablet_schema.create_block_by_cids(non_sort_key_cids);
CHECK_EQ(non_sort_key_cids.size(), old_value_block.columns());
if (!use_row_store) {
RETURN_IF_ERROR(fill_non_primary_key_columns_for_column_store(
rowset_ctx, rsid_to_rowset, tablet_schema, non_sort_key_cids, old_value_block,
mutable_full_columns, use_default_or_null_flag, has_default_or_nullable,
segment_start_pos, block_start_pos, block, skip_bitmaps));
} else {
RETURN_IF_ERROR(fill_non_primary_key_columns_for_row_store(
rowset_ctx, rsid_to_rowset, tablet_schema, non_sort_key_cids, old_value_block,
mutable_full_columns, use_default_or_null_flag, has_default_or_nullable,
segment_start_pos, block_start_pos, block, skip_bitmaps));
}
full_block.set_columns(std::move(mutable_full_columns));
return Status::OK();
}
Status FlexibleReadPlan::fill_non_primary_key_columns_for_column_store(
RowsetWriterContext* rowset_ctx, const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
const TabletSchema& tablet_schema, const std::vector<uint32_t>& non_sort_key_cids,
vectorized::Block& old_value_block, vectorized::MutableColumns& mutable_full_columns,
const std::vector<bool>& use_default_or_null_flag, bool has_default_or_nullable,
uint32_t segment_start_pos, uint32_t block_start_pos, const vectorized::Block* block,
std::vector<BitmapValue>* skip_bitmaps) const {
auto* info = rowset_ctx->partial_update_info.get();
int32_t seq_col_unique_id = -1;
if (tablet_schema.has_sequence_col()) {
seq_col_unique_id = tablet_schema.column(tablet_schema.sequence_col_idx()).unique_id();
}
// cid -> segment pos to write -> rowid to read in old_value_block
std::map<uint32_t, std::map<uint32_t, uint32_t>> read_index;
RETURN_IF_ERROR(
read_columns_by_plan(tablet_schema, rsid_to_rowset, old_value_block, &read_index));
// !!!ATTENTION!!!: columns in old_value_block may have different size because every row has different columns to update
const auto* delete_sign_column_data = BaseTablet::get_delete_sign_column_data(old_value_block);
// build default value columns
auto default_value_block = old_value_block.clone_empty();
if (has_default_or_nullable || delete_sign_column_data != nullptr) {
RETURN_IF_ERROR(BaseTablet::generate_default_value_block(
tablet_schema, non_sort_key_cids, info->default_values, old_value_block,
default_value_block));
}
auto fill_one_cell = [&tablet_schema, &read_index, info](
const TabletColumn& tablet_column, uint32_t cid,
vectorized::MutableColumnPtr& new_col,
const vectorized::IColumn& default_value_col,
const vectorized::IColumn& old_value_col,
const vectorized::IColumn& cur_col, std::size_t block_pos,
uint32_t segment_pos, bool skipped, bool row_has_sequence_col,
bool use_default, const signed char* delete_sign_column_data) {
if (skipped) {
DCHECK(cid != tablet_schema.skip_bitmap_col_idx());
DCHECK(cid != tablet_schema.version_col_idx());
DCHECK(!tablet_column.is_row_store_column());
if (!use_default) {
if (delete_sign_column_data != nullptr) {
bool old_row_delete_sign = false;
if (auto it = read_index[tablet_schema.delete_sign_idx()].find(segment_pos);
it != read_index[tablet_schema.delete_sign_idx()].end()) {
old_row_delete_sign = (delete_sign_column_data[it->second] != 0);
}
if (old_row_delete_sign) {
if (!tablet_schema.has_sequence_col()) {
use_default = true;
} else if (row_has_sequence_col ||
(!tablet_column.is_seqeunce_col() &&
(tablet_column.unique_id() != info->sequence_map_col_uid()))) {
// to keep the sequence column value not decreasing, we should read values of seq column(and seq map column)
// from old rows even if the old row is deleted when the input don't specify the sequence column, otherwise
// it may cause the merge-on-read based compaction to produce incorrect results
use_default = true;
}
}
}
}
if (!use_default && tablet_column.is_on_update_current_timestamp()) {
use_default = true;
}
if (use_default) {
if (tablet_column.has_default_value()) {
new_col->insert_from(default_value_col, 0);
} else if (tablet_column.is_nullable()) {
assert_cast<vectorized::ColumnNullable*, TypeCheckOnRelease::DISABLE>(
new_col.get())
->insert_many_defaults(1);
} else if (tablet_column.is_auto_increment()) {
// In flexible partial update, the skip bitmap indicates whether a cell
// is specified in the original load, so the generated auto-increment value is filled
// in current block in place if needed rather than using a seperate column to
// store the generated auto-increment value in fixed partial update
new_col->insert_from(cur_col, block_pos);
} else {
new_col->insert(tablet_column.get_vec_type()->get_default());
}
} else {
auto pos_in_old_block = read_index.at(cid).at(segment_pos);
new_col->insert_from(old_value_col, pos_in_old_block);
}
} else {
new_col->insert_from(cur_col, block_pos);
}
};
// fill all non sort key columns from mutable_old_columns, need to consider default value and null value
for (std::size_t i {0}; i < non_sort_key_cids.size(); i++) {
auto cid = non_sort_key_cids[i];
const auto& tablet_column = tablet_schema.column(cid);
auto col_uid = tablet_column.unique_id();
for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) {
auto segment_pos = segment_start_pos + idx;
auto block_pos = block_start_pos + idx;
fill_one_cell(tablet_column, cid, mutable_full_columns[cid],
*default_value_block.get_by_position(i).column,
*old_value_block.get_by_position(i).column,
*block->get_by_position(cid).column, block_pos, segment_pos,
skip_bitmaps->at(block_pos).contains(col_uid),
tablet_schema.has_sequence_col()
? !skip_bitmaps->at(block_pos).contains(seq_col_unique_id)
: false,
use_default_or_null_flag[idx], delete_sign_column_data);
}
}
return Status::OK();
}
Status FlexibleReadPlan::fill_non_primary_key_columns_for_row_store(
RowsetWriterContext* rowset_ctx, const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
const TabletSchema& tablet_schema, const std::vector<uint32_t>& non_sort_key_cids,
vectorized::Block& old_value_block, vectorized::MutableColumns& mutable_full_columns,
const std::vector<bool>& use_default_or_null_flag, bool has_default_or_nullable,
uint32_t segment_start_pos, uint32_t block_start_pos, const vectorized::Block* block,
std::vector<BitmapValue>* skip_bitmaps) const {
auto* info = rowset_ctx->partial_update_info.get();
int32_t seq_col_unique_id = -1;
if (tablet_schema.has_sequence_col()) {
seq_col_unique_id = tablet_schema.column(tablet_schema.sequence_col_idx()).unique_id();
}
// segment pos to write -> rowid to read in old_value_block
std::map<uint32_t, uint32_t> read_index;
RETURN_IF_ERROR(read_columns_by_plan(tablet_schema, non_sort_key_cids, rsid_to_rowset,
old_value_block, &read_index));
const auto* delete_sign_column_data = BaseTablet::get_delete_sign_column_data(old_value_block);
// build default value columns
auto default_value_block = old_value_block.clone_empty();
if (has_default_or_nullable || delete_sign_column_data != nullptr) {
RETURN_IF_ERROR(BaseTablet::generate_default_value_block(
tablet_schema, non_sort_key_cids, info->default_values, old_value_block,
default_value_block));
}
auto fill_one_cell = [&tablet_schema, info](const TabletColumn& tablet_column, uint32_t cid,
vectorized::MutableColumnPtr& new_col,
const vectorized::IColumn& default_value_col,
const vectorized::IColumn& old_value_col,
const vectorized::IColumn& cur_col,
std::size_t block_pos, bool skipped,
bool row_has_sequence_col, bool use_default,
const signed char* delete_sign_column_data,
uint32_t pos_in_old_block) {
if (skipped) {
DCHECK(cid != tablet_schema.skip_bitmap_col_idx());
DCHECK(cid != tablet_schema.version_col_idx());
DCHECK(!tablet_column.is_row_store_column());
if (!use_default) {
if (delete_sign_column_data != nullptr) {
bool old_row_delete_sign = (delete_sign_column_data[pos_in_old_block] != 0);
if (old_row_delete_sign) {
if (!tablet_schema.has_sequence_col()) {
use_default = true;
} else if (row_has_sequence_col ||
(!tablet_column.is_seqeunce_col() &&
(tablet_column.unique_id() != info->sequence_map_col_uid()))) {
// to keep the sequence column value not decreasing, we should read values of seq column(and seq map column)
// from old rows even if the old row is deleted when the input don't specify the sequence column, otherwise
// it may cause the merge-on-read based compaction to produce incorrect results
use_default = true;
}
}
}
}
if (!use_default && tablet_column.is_on_update_current_timestamp()) {
use_default = true;
}
if (use_default) {
if (tablet_column.has_default_value()) {
new_col->insert_from(default_value_col, 0);
} else if (tablet_column.is_nullable()) {
assert_cast<vectorized::ColumnNullable*, TypeCheckOnRelease::DISABLE>(
new_col.get())
->insert_many_defaults(1);
} else if (tablet_column.is_auto_increment()) {
// In flexible partial update, the skip bitmap indicates whether a cell
// is specified in the original load, so the generated auto-increment value is filled
// in current block in place if needed rather than using a seperate column to
// store the generated auto-increment value in fixed partial update
new_col->insert_from(cur_col, block_pos);
} else {
new_col->insert(tablet_column.get_vec_type()->get_default());
}
} else {
new_col->insert_from(old_value_col, pos_in_old_block);
}
} else {
new_col->insert_from(cur_col, block_pos);
}
};
// fill all non sort key columns from mutable_old_columns, need to consider default value and null value
for (std::size_t i {0}; i < non_sort_key_cids.size(); i++) {
auto cid = non_sort_key_cids[i];
const auto& tablet_column = tablet_schema.column(cid);
auto col_uid = tablet_column.unique_id();
for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) {
auto segment_pos = segment_start_pos + idx;
auto block_pos = block_start_pos + idx;
auto pos_in_old_block = read_index[segment_pos];
fill_one_cell(tablet_column, cid, mutable_full_columns[cid],
*default_value_block.get_by_position(i).column,
*old_value_block.get_by_position(i).column,
*block->get_by_position(cid).column, block_pos,
skip_bitmaps->at(block_pos).contains(col_uid),
tablet_schema.has_sequence_col()
? !skip_bitmaps->at(block_pos).contains(seq_col_unique_id)
: false,
use_default_or_null_flag[idx], delete_sign_column_data, pos_in_old_block);
}
}
return Status::OK();
}
BlockAggregator::BlockAggregator(segment_v2::VerticalSegmentWriter& vertical_segment_writer)
: _writer(vertical_segment_writer), _tablet_schema(*_writer._tablet_schema) {}
void BlockAggregator::merge_one_row(vectorized::MutableBlock& dst_block,
vectorized::Block* src_block, int rid,
BitmapValue& skip_bitmap) {
for (size_t cid {_tablet_schema.num_key_columns()}; cid < _tablet_schema.num_columns(); cid++) {
if (cid == _tablet_schema.skip_bitmap_col_idx()) {
auto& cur_skip_bitmap =
assert_cast<vectorized::ColumnBitmap*>(dst_block.mutable_columns()[cid].get())
->get_data()
.back();
const auto& new_row_skip_bitmap =
assert_cast<vectorized::ColumnBitmap*>(
src_block->get_by_position(cid).column->assume_mutable().get())
->get_data()[rid];
cur_skip_bitmap &= new_row_skip_bitmap;
continue;
}
if (!skip_bitmap.contains(_tablet_schema.column(cid).unique_id())) {
dst_block.mutable_columns()[cid]->pop_back(1);
dst_block.mutable_columns()[cid]->insert_from(*src_block->get_by_position(cid).column,
rid);
}
}
VLOG_DEBUG << fmt::format("merge a row, after merge, output_block.rows()={}, state: {}",
dst_block.rows(), _state.to_string());
}
void BlockAggregator::append_one_row(vectorized::MutableBlock& dst_block,
vectorized::Block* src_block, int rid) {
dst_block.add_row(src_block, rid);
_state.rows++;
VLOG_DEBUG << fmt::format("append a new row, after append, output_block.rows()={}, state: {}",
dst_block.rows(), _state.to_string());
}
void BlockAggregator::remove_last_n_rows(vectorized::MutableBlock& dst_block, int n) {
if (n > 0) {
for (size_t cid {0}; cid < _tablet_schema.num_columns(); cid++) {
DCHECK_GE(dst_block.mutable_columns()[cid]->size(), n);
dst_block.mutable_columns()[cid]->pop_back(n);
}
}
}
void BlockAggregator::append_or_merge_row(vectorized::MutableBlock& dst_block,
vectorized::Block* src_block, int rid,
BitmapValue& skip_bitmap, bool have_delete_sign) {
if (have_delete_sign) {
// remove all the previous batched rows
remove_last_n_rows(dst_block, _state.rows);
_state.rows = 0;
_state.has_row_with_delete_sign = true;
append_one_row(dst_block, src_block, rid);
} else {
if (_state.should_merge()) {
merge_one_row(dst_block, src_block, rid, skip_bitmap);
} else {
append_one_row(dst_block, src_block, rid);
}
}
};
Status BlockAggregator::aggregate_rows(
vectorized::MutableBlock& output_block, vectorized::Block* block, int start, int end,
std::string key, std::vector<BitmapValue>* skip_bitmaps, const signed char* delete_signs,
vectorized::IOlapColumnDataAccessor* seq_column,
const std::vector<RowsetSharedPtr>& specified_rowsets,
std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches) {
VLOG_DEBUG << fmt::format("merge rows in range=[{}-{})", start, end);
if (end - start == 1) {
output_block.add_row(block, start);
VLOG_DEBUG << fmt::format("append a row directly, rid={}", start);
return Status::OK();
}
auto seq_col_unique_id = _tablet_schema.column(_tablet_schema.sequence_col_idx()).unique_id();
auto delete_sign_col_unique_id =
_tablet_schema.column(_tablet_schema.delete_sign_idx()).unique_id();
_state.reset();
RowLocation loc;
RowsetSharedPtr rowset;
std::string previous_encoded_seq_value {};
Status st = _writer._tablet->lookup_row_key(
key, &_tablet_schema, false, specified_rowsets, &loc, _writer._mow_context->max_version,
segment_caches, &rowset, true, &previous_encoded_seq_value);
int pos = start;
bool is_expected_st = (st.is<ErrorCode::KEY_NOT_FOUND>() || st.ok());
DCHECK(is_expected_st || st.is<ErrorCode::MEM_LIMIT_EXCEEDED>())
<< "[BlockAggregator::aggregate_rows] unexpected error status while lookup_row_key:"
<< st;
if (!is_expected_st) {
return st;
}
std::string cur_seq_val;
if (st.ok()) {
for (pos = start; pos < end; pos++) {
auto& skip_bitmap = skip_bitmaps->at(pos);
bool row_has_sequence_col = (!skip_bitmap.contains(seq_col_unique_id));
// Discard all the rows whose seq value is smaller than previous_encoded_seq_value.
if (row_has_sequence_col) {
std::string seq_val {};
_writer._encode_seq_column(seq_column, pos, &seq_val);
if (Slice {seq_val}.compare(Slice {previous_encoded_seq_value}) < 0) {
continue;
}
cur_seq_val = std::move(seq_val);
break;
}
cur_seq_val = std::move(previous_encoded_seq_value);
break;
}
} else {
pos = start;
auto& skip_bitmap = skip_bitmaps->at(pos);
bool row_has_sequence_col = (!skip_bitmap.contains(seq_col_unique_id));
if (row_has_sequence_col) {
std::string seq_val {};
// for rows that don't specify seqeunce col, seq_val will be encoded to minial value
_writer._encode_seq_column(seq_column, pos, &seq_val);
cur_seq_val = std::move(seq_val);
} else {
cur_seq_val.clear();
RETURN_IF_ERROR(_writer._generate_encoded_default_seq_value(
_tablet_schema, *_writer._opts.rowset_ctx->partial_update_info, &cur_seq_val));
}
}
for (int rid {pos}; rid < end; rid++) {
auto& skip_bitmap = skip_bitmaps->at(rid);
bool row_has_sequence_col = (!skip_bitmap.contains(seq_col_unique_id));
bool have_delete_sign =
(!skip_bitmap.contains(delete_sign_col_unique_id) && delete_signs[rid] != 0);
if (!row_has_sequence_col) {
append_or_merge_row(output_block, block, rid, skip_bitmap, have_delete_sign);
} else {
std::string seq_val {};
_writer._encode_seq_column(seq_column, rid, &seq_val);
if (Slice {seq_val}.compare(Slice {cur_seq_val}) >= 0) {
append_or_merge_row(output_block, block, rid, skip_bitmap, have_delete_sign);
cur_seq_val = std::move(seq_val);
} else {
VLOG_DEBUG << fmt::format(
"skip rid={} becasue its seq value is lower than the previous", rid);
}
}
}
return Status::OK();
};
Status BlockAggregator::aggregate_for_sequence_column(
vectorized::Block* block, int num_rows,
const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns,
vectorized::IOlapColumnDataAccessor* seq_column,
const std::vector<RowsetSharedPtr>& specified_rowsets,
std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches) {
DCHECK_EQ(block->columns(), _tablet_schema.num_columns());
// the process logic here is the same as MemTable::_aggregate_for_flexible_partial_update_without_seq_col()
// after this function, there will be at most 2 rows for a specified key
std::vector<BitmapValue>* skip_bitmaps =
&(assert_cast<vectorized::ColumnBitmap*>(
block->get_by_position(_tablet_schema.skip_bitmap_col_idx())
.column->assume_mutable()
.get())
->get_data());
const auto* delete_signs = BaseTablet::get_delete_sign_column_data(*block, num_rows);
auto filtered_block = _tablet_schema.create_block();
vectorized::MutableBlock output_block =
vectorized::MutableBlock::build_mutable_block(&filtered_block);
int same_key_rows {0};
std::string previous_key {};
for (int block_pos {0}; block_pos < num_rows; block_pos++) {
std::string key = _writer._full_encode_keys(key_columns, block_pos);
if (block_pos > 0 && previous_key == key) {
same_key_rows++;
} else {
if (same_key_rows > 0) {
RETURN_IF_ERROR(aggregate_rows(output_block, block, block_pos - same_key_rows,
block_pos, std::move(previous_key), skip_bitmaps,
delete_signs, seq_column, specified_rowsets,
segment_caches));
}
same_key_rows = 1;
}
previous_key = std::move(key);
}
if (same_key_rows > 0) {
RETURN_IF_ERROR(aggregate_rows(output_block, block, num_rows - same_key_rows, num_rows,
std::move(previous_key), skip_bitmaps, delete_signs,
seq_column, specified_rowsets, segment_caches));
}
block->swap(output_block.to_block());
return Status::OK();
}
Status BlockAggregator::fill_sequence_column(vectorized::Block* block, size_t num_rows,
const FixedReadPlan& read_plan,
std::vector<BitmapValue>& skip_bitmaps) {
DCHECK(_tablet_schema.has_sequence_col());
std::vector<uint32_t> cids {static_cast<uint32_t>(_tablet_schema.sequence_col_idx())};
auto seq_col_unique_id = _tablet_schema.column(_tablet_schema.sequence_col_idx()).unique_id();
auto seq_col_block = _tablet_schema.create_block_by_cids(cids);
auto tmp_block = _tablet_schema.create_block_by_cids(cids);
std::map<uint32_t, uint32_t> read_index;
RETURN_IF_ERROR(read_plan.read_columns_by_plan(_tablet_schema, cids, _writer._rsid_to_rowset,
seq_col_block, &read_index, false));
auto new_seq_col_ptr = tmp_block.get_by_position(0).column->assume_mutable();
const auto& old_seq_col_ptr = *seq_col_block.get_by_position(0).column;
const auto& cur_seq_col_ptr = *block->get_by_position(_tablet_schema.sequence_col_idx()).column;
for (uint32_t block_pos {0}; block_pos < num_rows; block_pos++) {
if (read_index.contains(block_pos)) {
new_seq_col_ptr->insert_from(old_seq_col_ptr, read_index[block_pos]);
skip_bitmaps[block_pos].remove(seq_col_unique_id);
} else {
new_seq_col_ptr->insert_from(cur_seq_col_ptr, block_pos);
}
}
block->replace_by_position(_tablet_schema.sequence_col_idx(), std::move(new_seq_col_ptr));
return Status::OK();
}
Status BlockAggregator::aggregate_for_insert_after_delete(
vectorized::Block* block, size_t num_rows,
const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns,
const std::vector<RowsetSharedPtr>& specified_rowsets,
std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches) {
DCHECK_EQ(block->columns(), _tablet_schema.num_columns());
// there will be at most 2 rows for a specified key in block when control flow reaches here
// after this function, there will not be duplicate rows in block
std::vector<BitmapValue>* skip_bitmaps =
&(assert_cast<vectorized::ColumnBitmap*>(
block->get_by_position(_tablet_schema.skip_bitmap_col_idx())
.column->assume_mutable()
.get())
->get_data());
const auto* delete_signs = BaseTablet::get_delete_sign_column_data(*block, num_rows);
auto filter_column = vectorized::ColumnUInt8::create(num_rows, 1);
auto* __restrict filter_map = filter_column->get_data().data();
std::string previous_key {};
bool previous_has_delete_sign {false};
int duplicate_rows {0};
int32_t delete_sign_col_unique_id =
_tablet_schema.column(_tablet_schema.delete_sign_idx()).unique_id();
auto seq_col_unique_id =
(_tablet_schema.sequence_col_idx() != -1)
? _tablet_schema.column(_tablet_schema.sequence_col_idx()).unique_id()
: -1;
FixedReadPlan read_plan;
for (size_t block_pos {0}; block_pos < num_rows; block_pos++) {
size_t delta_pos = block_pos;
auto& skip_bitmap = skip_bitmaps->at(block_pos);
std::string key = _writer._full_encode_keys(key_columns, delta_pos);
bool have_delete_sign =
(!skip_bitmap.contains(delete_sign_col_unique_id) && delete_signs[block_pos] != 0);
if (delta_pos > 0 && previous_key == key) {
// !!ATTENTION!!: We can only remove the row with delete sign if there is a insert with the same key after this row.
// If there is only a row with delete sign, we should keep it and can't remove it from block, because
// compaction will not use the delete bitmap when reading data. So there may still be rows with delete sign
// in later process
DCHECK(previous_has_delete_sign);
DCHECK(!have_delete_sign);
++duplicate_rows;
RowLocation loc;
RowsetSharedPtr rowset;
Status st = _writer._tablet->lookup_row_key(
key, &_tablet_schema, false, specified_rowsets, &loc,
_writer._mow_context->max_version, segment_caches, &rowset, true);
bool is_expected_st = (st.is<ErrorCode::KEY_NOT_FOUND>() || st.ok());
DCHECK(is_expected_st || st.is<ErrorCode::MEM_LIMIT_EXCEEDED>())
<< "[BlockAggregator::aggregate_for_insert_after_delete] unexpected error "
"status while lookup_row_key:"
<< st;
if (!is_expected_st) {
return st;
}
Slice previous_seq_slice {};
if (st.ok()) {
if (_tablet_schema.has_sequence_col()) {
// if the insert row doesn't specify the sequence column, we need to
// read the historical's sequence column value so that we don't need
// to handle seqeunce column in append_block_with_flexible_content()
// for this row
bool row_has_sequence_col = (!skip_bitmap.contains(seq_col_unique_id));
if (!row_has_sequence_col) {
read_plan.prepare_to_read(loc, block_pos);
_writer._rsid_to_rowset.emplace(rowset->rowset_id(), rowset);
}
}
// delete the existing row
_writer._mow_context->delete_bitmap->add(
{loc.rowset_id, loc.segment_id, DeleteBitmap::TEMP_VERSION_COMMON},
loc.row_id);
}
// and remove the row with delete sign from the current block
filter_map[block_pos - 1] = 0;
}
previous_has_delete_sign = have_delete_sign;
previous_key = std::move(key);
}
if (duplicate_rows > 0) {
if (!read_plan.empty()) {
// fill sequence column value for some rows
RETURN_IF_ERROR(fill_sequence_column(block, num_rows, read_plan, *skip_bitmaps));
}
RETURN_IF_ERROR(filter_block(block, num_rows, std::move(filter_column), duplicate_rows,
"__filter_insert_after_delete_col__"));
}
return Status::OK();
}
Status BlockAggregator::filter_block(vectorized::Block* block, size_t num_rows,
vectorized::MutableColumnPtr filter_column, int duplicate_rows,
std::string col_name) {
auto num_cols = block->columns();
block->insert(
{std::move(filter_column), std::make_shared<vectorized::DataTypeUInt8>(), col_name});
RETURN_IF_ERROR(vectorized::Block::filter_block(block, num_cols, num_cols));
DCHECK_EQ(num_cols, block->columns());
size_t merged_rows = num_rows - block->rows();
if (duplicate_rows != merged_rows) {
auto msg = fmt::format(
"filter_block_for_flexible_partial_update {}: duplicate_rows != merged_rows, "
"duplicate_keys={}, merged_rows={}, num_rows={}, mutable_block->rows()={}",
col_name, duplicate_rows, merged_rows, num_rows, block->rows());
DCHECK(false) << msg;
return Status::InternalError<false>(msg);
}
return Status::OK();
}
Status BlockAggregator::convert_pk_columns(
vectorized::Block* block, size_t row_pos, size_t num_rows,
std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns) {
key_columns.clear();
for (uint32_t cid {0}; cid < _tablet_schema.num_key_columns(); cid++) {
RETURN_IF_ERROR(_writer._olap_data_convertor->set_source_content_with_specifid_column(
block->get_by_position(cid), row_pos, num_rows, cid));
auto [status, column] = _writer._olap_data_convertor->convert_column_data(cid);
if (!status.ok()) {
return status;
}
key_columns.push_back(column);
}
return Status::OK();
}
Status BlockAggregator::convert_seq_column(vectorized::Block* block, size_t row_pos,
size_t num_rows,
vectorized::IOlapColumnDataAccessor*& seq_column) {
seq_column = nullptr;
if (_tablet_schema.has_sequence_col()) {
auto seq_col_idx = _tablet_schema.sequence_col_idx();
RETURN_IF_ERROR(_writer._olap_data_convertor->set_source_content_with_specifid_column(
block->get_by_position(seq_col_idx), row_pos, num_rows, seq_col_idx));
auto [status, column] = _writer._olap_data_convertor->convert_column_data(seq_col_idx);
if (!status.ok()) {
return status;
}
seq_column = column;
}
return Status::OK();
};
Status BlockAggregator::aggregate_for_flexible_partial_update(
vectorized::Block* block, size_t num_rows,
const std::vector<RowsetSharedPtr>& specified_rowsets,
std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches) {
std::vector<vectorized::IOlapColumnDataAccessor*> key_columns {};
vectorized::IOlapColumnDataAccessor* seq_column {nullptr};
RETURN_IF_ERROR(convert_pk_columns(block, 0, num_rows, key_columns));
RETURN_IF_ERROR(convert_seq_column(block, 0, num_rows, seq_column));
// 1. merge duplicate rows when table has sequence column
// When there are multiple rows with the same keys in memtable, some of them specify specify the sequence column,
// some of them don't. We can't do the de-duplication in memtable because we don't know the historical data. We must
// de-duplicate them here.
if (_tablet_schema.has_sequence_col()) {
RETURN_IF_ERROR(aggregate_for_sequence_column(block, static_cast<int>(num_rows),
key_columns, seq_column, specified_rowsets,
segment_caches));
}
// 2. merge duplicate rows and handle insert after delete
if (block->rows() != num_rows) {
num_rows = block->rows();
// data in block has changed, should re-encode key columns, sequence column
_writer._olap_data_convertor->clear_source_content();
RETURN_IF_ERROR(convert_pk_columns(block, 0, num_rows, key_columns));
RETURN_IF_ERROR(convert_seq_column(block, 0, num_rows, seq_column));
}
RETURN_IF_ERROR(aggregate_for_insert_after_delete(block, num_rows, key_columns,
specified_rowsets, segment_caches));
return Status::OK();
}
} // namespace doris