blob: df1dcef0e3f523ff664f454507c730a89e54732a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap/schema_change.h"
#include <gen_cpp/olap_file.pb.h>
#include <glog/logging.h>
#include <thrift/protocol/TDebugProtocol.h>
#include <algorithm>
#include <exception>
#include <map>
#include <memory>
#include <mutex>
#include <roaring/roaring.hh>
#include <tuple>
#include <utility>
#include "agent/be_exec_version_manager.h"
#include "cloud/cloud_schema_change_job.h"
#include "cloud/config.h"
#include "common/consts.h"
#include "common/logging.h"
#include "common/signal_handler.h"
#include "common/status.h"
#include "exec/schema_scanner/schema_metadata_name_ids_scanner.h"
#include "io/fs/file_system.h"
#include "io/io_common.h"
#include "olap/base_tablet.h"
#include "olap/data_dir.h"
#include "olap/delete_handler.h"
#include "olap/field.h"
#include "olap/iterators.h"
#include "olap/merger.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/pending_rowset_helper.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/rowset/rowset_reader_context.h"
#include "olap/rowset/rowset_writer_context.h"
#include "olap/rowset/segment_v2/column_reader.h"
#include "olap/rowset/segment_v2/inverted_index_desc.h"
#include "olap/rowset/segment_v2/inverted_index_writer.h"
#include "olap/rowset/segment_v2/segment.h"
#include "olap/schema.h"
#include "olap/segment_loader.h"
#include "olap/storage_engine.h"
#include "olap/tablet.h"
#include "olap/tablet_fwd.h"
#include "olap/tablet_manager.h"
#include "olap/tablet_meta.h"
#include "olap/tablet_schema.h"
#include "olap/types.h"
#include "olap/utils.h"
#include "olap/wrapper_field.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/runtime_state.h"
#include "util/debug_points.h"
#include "util/defer_op.h"
#include "util/trace.h"
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/aggregate_functions/aggregate_function_reader.h"
#include "vec/columns/column.h"
#include "vec/columns/column_nullable.h"
#include "vec/common/assert_cast.h"
#include "vec/common/schema_util.h"
#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/olap/olap_data_convertor.h"
namespace doris {
class CollectionValue;
using namespace ErrorCode;
constexpr int ALTER_TABLE_BATCH_SIZE = 4064;
class MultiBlockMerger {
public:
MultiBlockMerger(BaseTabletSPtr tablet) : _tablet(tablet), _cmp(*tablet) {}
Status merge(const std::vector<std::unique_ptr<vectorized::Block>>& blocks,
RowsetWriter* rowset_writer, uint64_t* merged_rows) {
int rows = 0;
for (const auto& block : blocks) {
rows += block->rows();
}
if (!rows) {
return Status::OK();
}
std::vector<RowRef> row_refs;
row_refs.reserve(rows);
for (const auto& block : blocks) {
for (uint16_t i = 0; i < block->rows(); i++) {
row_refs.emplace_back(block.get(), i);
}
}
// TODO: try to use pdqsort to replace std::sort
// The block version is incremental.
std::stable_sort(row_refs.begin(), row_refs.end(), _cmp);
auto finalized_block = _tablet->tablet_schema()->create_block();
int columns = finalized_block.columns();
*merged_rows += rows;
if (_tablet->keys_type() == KeysType::AGG_KEYS) {
auto tablet_schema = _tablet->tablet_schema();
int key_number = _tablet->num_key_columns();
std::vector<vectorized::AggregateFunctionPtr> agg_functions;
std::vector<vectorized::AggregateDataPtr> agg_places;
for (int i = key_number; i < columns; i++) {
try {
vectorized::AggregateFunctionPtr function =
tablet_schema->column(i).get_aggregate_function(
vectorized::AGG_LOAD_SUFFIX,
tablet_schema->column(i).get_be_exec_version());
if (!function) {
return Status::InternalError(
"could not find aggregate function on column {}, aggregation={}",
tablet_schema->column(i).name(),
tablet_schema->column(i).aggregation());
}
agg_functions.push_back(function);
// create aggregate data
auto* place = new char[function->size_of_data()];
function->create(place);
agg_places.push_back(place);
} catch (...) {
for (int j = 0; j < i - key_number; ++j) {
agg_functions[j]->destroy(agg_places[j]);
delete[] agg_places[j];
}
throw;
}
}
DEFER({
for (int i = 0; i < columns - key_number; i++) {
agg_functions[i]->destroy(agg_places[i]);
delete[] agg_places[i];
}
});
for (int i = 0; i < rows; i++) {
auto row_ref = row_refs[i];
for (int j = key_number; j < columns; j++) {
const auto* column_ptr = row_ref.get_column(j).get();
agg_functions[j - key_number]->add(
agg_places[j - key_number],
const_cast<const vectorized::IColumn**>(&column_ptr), row_ref.position,
_arena);
}
if (i == rows - 1 || _cmp.compare(row_refs[i], row_refs[i + 1])) {
for (int j = 0; j < key_number; j++) {
finalized_block.get_by_position(j).column->assume_mutable()->insert_from(
*row_ref.get_column(j), row_ref.position);
}
for (int j = key_number; j < columns; j++) {
agg_functions[j - key_number]->insert_result_into(
agg_places[j - key_number],
finalized_block.get_by_position(j).column->assume_mutable_ref());
agg_functions[j - key_number]->reset(agg_places[j - key_number]);
}
if (i == rows - 1 || finalized_block.rows() == ALTER_TABLE_BATCH_SIZE) {
*merged_rows -= finalized_block.rows();
RETURN_IF_ERROR(rowset_writer->add_block(&finalized_block));
finalized_block.clear_column_data();
}
}
}
} else {
std::vector<RowRef> pushed_row_refs;
if (_tablet->keys_type() == KeysType::DUP_KEYS) {
std::swap(pushed_row_refs, row_refs);
} else if (_tablet->keys_type() == KeysType::UNIQUE_KEYS) {
for (int i = 0; i < rows; i++) {
if (i == rows - 1 || _cmp.compare(row_refs[i], row_refs[i + 1])) {
pushed_row_refs.push_back(row_refs[i]);
}
}
if (!_tablet->tablet_schema()->cluster_key_uids().empty()) {
std::vector<uint32_t> ids;
for (const auto& cid : _tablet->tablet_schema()->cluster_key_uids()) {
auto index = _tablet->tablet_schema()->field_index(cid);
if (index == -1) {
return Status::InternalError(
"could not find cluster key column with unique_id=" +
std::to_string(cid) + " in tablet schema");
}
ids.push_back(index);
}
// sort by cluster key
std::stable_sort(pushed_row_refs.begin(), pushed_row_refs.end(),
ClusterKeyRowRefComparator(ids));
}
}
// update real inserted row number
rows = pushed_row_refs.size();
*merged_rows -= rows;
for (int i = 0; i < rows; i += ALTER_TABLE_BATCH_SIZE) {
int limit = std::min(ALTER_TABLE_BATCH_SIZE, rows - i);
for (int idx = 0; idx < columns; idx++) {
auto column = finalized_block.get_by_position(idx).column->assume_mutable();
for (int j = 0; j < limit; j++) {
auto row_ref = pushed_row_refs[i + j];
column->insert_from(*row_ref.get_column(idx), row_ref.position);
}
}
RETURN_IF_ERROR(rowset_writer->add_block(&finalized_block));
finalized_block.clear_column_data();
}
}
RETURN_IF_ERROR(rowset_writer->flush());
return Status::OK();
}
private:
struct RowRef {
RowRef(vectorized::Block* block_, uint16_t position_)
: block(block_), position(position_) {}
vectorized::ColumnPtr get_column(int index) const {
return block->get_by_position(index).column;
}
const vectorized::Block* block;
uint16_t position;
};
struct RowRefComparator {
RowRefComparator(const BaseTablet& tablet) : _num_columns(tablet.num_key_columns()) {}
int compare(const RowRef& lhs, const RowRef& rhs) const {
// Notice: does not compare sequence column for mow table
// read from rowsets with delete bitmap, so there should be no duplicated keys
return lhs.block->compare_at(lhs.position, rhs.position, _num_columns, *rhs.block, -1);
}
bool operator()(const RowRef& lhs, const RowRef& rhs) const {
return compare(lhs, rhs) < 0;
}
const size_t _num_columns;
};
struct ClusterKeyRowRefComparator {
ClusterKeyRowRefComparator(std::vector<uint32_t> columns) : _columns(columns) {}
int compare(const RowRef& lhs, const RowRef& rhs) const {
return lhs.block->compare_at(lhs.position, rhs.position, &_columns, *rhs.block, -1);
}
bool operator()(const RowRef& lhs, const RowRef& rhs) const {
return compare(lhs, rhs) < 0;
}
const std::vector<uint32_t> _columns;
};
BaseTabletSPtr _tablet;
RowRefComparator _cmp;
vectorized::Arena _arena;
};
BlockChanger::BlockChanger(TabletSchemaSPtr tablet_schema, DescriptorTbl desc_tbl)
: _desc_tbl(std::move(desc_tbl)) {
_schema_mapping.resize(tablet_schema->num_columns());
}
BlockChanger::~BlockChanger() {
for (auto it = _schema_mapping.begin(); it != _schema_mapping.end(); ++it) {
SAFE_DELETE(it->default_value);
}
_schema_mapping.clear();
}
ColumnMapping* BlockChanger::get_mutable_column_mapping(size_t column_index) {
if (column_index >= _schema_mapping.size()) {
return nullptr;
}
return &(_schema_mapping[column_index]);
}
Status BlockChanger::change_block(vectorized::Block* ref_block,
vectorized::Block* new_block) const {
std::unique_ptr<RuntimeState> state = RuntimeState::create_unique();
state->set_desc_tbl(&_desc_tbl);
state->set_be_exec_version(_fe_compatible_version);
RowDescriptor row_desc =
RowDescriptor(_desc_tbl.get_tuple_descriptor(_desc_tbl.get_row_tuples()[0]), false);
if (_where_expr != nullptr) {
vectorized::VExprContextSPtr ctx = nullptr;
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(*_where_expr, ctx));
RETURN_IF_ERROR(ctx->prepare(state.get(), row_desc));
RETURN_IF_ERROR(ctx->open(state.get()));
RETURN_IF_ERROR(
vectorized::VExprContext::filter_block(ctx.get(), ref_block, ref_block->columns()));
}
const int row_num = ref_block->rows();
const int new_schema_cols_num = new_block->columns();
// will be used for swaping ref_block[entry.first] and new_block[entry.second]
std::list<std::pair<int, int>> swap_idx_list;
for (int idx = 0; idx < new_schema_cols_num; idx++) {
auto expr = _schema_mapping[idx].expr;
if (expr != nullptr) {
vectorized::VExprContextSPtr ctx;
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(*expr, ctx));
RETURN_IF_ERROR(ctx->prepare(state.get(), row_desc));
RETURN_IF_ERROR(ctx->open(state.get()));
int result_tmp_column_idx = -1;
RETURN_IF_ERROR(ctx->execute(ref_block, &result_tmp_column_idx));
auto& result_tmp_column_def = ref_block->get_by_position(result_tmp_column_idx);
if (!result_tmp_column_def.column) {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
"result column={} is nullptr, input expr={}", result_tmp_column_def.name,
apache::thrift::ThriftDebugString(*expr));
}
ref_block->replace_by_position_if_const(result_tmp_column_idx);
if (result_tmp_column_def.column->size() != row_num) {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
"result size invalid, expect={}, real={}; input expr={}, block={}", row_num,
result_tmp_column_def.column->size(),
apache::thrift::ThriftDebugString(*expr), ref_block->dump_structure());
}
if (_type == SCHEMA_CHANGE) {
// danger casts (expected to be rejected by upstream caller) may cause data to be null and result in data loss in schema change
// for rollup, this check is unecessary, and ref columns are not set in this case, it works on exprs
// column_idx in base schema
int32_t ref_column_idx = _schema_mapping[idx].ref_column_idx;
DCHECK_GE(ref_column_idx, 0);
auto& ref_column_def = ref_block->get_by_position(ref_column_idx);
RETURN_IF_ERROR(
_check_cast_valid(ref_column_def.column, result_tmp_column_def.column));
}
swap_idx_list.emplace_back(result_tmp_column_idx, idx);
} else if (_schema_mapping[idx].ref_column_idx < 0) {
// new column, write default value
auto* value = _schema_mapping[idx].default_value;
auto column = new_block->get_by_position(idx).column->assume_mutable();
if (value->is_null()) {
DCHECK(column->is_nullable());
column->insert_many_defaults(row_num);
} else {
auto type_info = get_type_info(_schema_mapping[idx].new_column);
DefaultValueColumnIterator::insert_default_data(type_info.get(), value->size(),
value->ptr(), column, row_num);
}
} else {
// same type, just swap column
swap_idx_list.emplace_back(_schema_mapping[idx].ref_column_idx, idx);
}
}
for (auto it : swap_idx_list) {
auto& ref_col = ref_block->get_by_position(it.first).column;
auto& new_col = new_block->get_by_position(it.second).column;
bool ref_col_nullable = ref_col->is_nullable();
bool new_col_nullable = new_col->is_nullable();
if (ref_col_nullable != new_col_nullable) {
// not nullable to nullable
if (new_col_nullable) {
auto* new_nullable_col =
assert_cast<vectorized::ColumnNullable*>(new_col->assume_mutable().get());
new_nullable_col->change_nested_column(ref_col);
new_nullable_col->get_null_map_data().resize_fill(ref_col->size());
} else {
// nullable to not nullable:
// suppose column `c_phone` is originally varchar(16) NOT NULL,
// then do schema change `alter table test modify column c_phone int not null`,
// the cast expr of schema change is `CastExpr(CAST String to Nullable(Int32))`,
// so need to handle nullable to not nullable here
auto* ref_nullable_col =
assert_cast<vectorized::ColumnNullable*>(ref_col->assume_mutable().get());
new_col = ref_nullable_col->get_nested_column_ptr();
}
} else {
new_block->get_by_position(it.second).column =
ref_block->get_by_position(it.first).column;
}
}
return Status::OK();
}
// This check can prevent schema-change from causing data loss after type cast
Status BlockChanger::_check_cast_valid(vectorized::ColumnPtr input_column,
vectorized::ColumnPtr output_column) {
if (input_column->size() != output_column->size()) {
return Status::InternalError(
"column size is changed, input_column_size={}, output_column_size={}; "
"input_column={}",
input_column->size(), output_column->size(), input_column->get_name());
}
DCHECK_EQ(input_column->size(), output_column->size())
<< "length check should have done before calling this function!";
if (input_column->is_nullable() != output_column->is_nullable()) {
if (input_column->is_nullable()) {
const auto* ref_null_map =
vectorized::check_and_get_column<vectorized::ColumnNullable>(input_column.get())
->get_null_map_column()
.get_data()
.data();
bool is_changed = false;
for (size_t i = 0; i < input_column->size(); i++) {
is_changed |= ref_null_map[i];
}
if (is_changed) {
return Status::DataQualityError(
"some null data is changed to not null, intput_column={}",
input_column->get_name());
}
} else {
const auto& null_map_column =
vectorized::check_and_get_column<vectorized::ColumnNullable>(
output_column.get())
->get_null_map_column();
const auto& nested_column =
vectorized::check_and_get_column<vectorized::ColumnNullable>(
output_column.get())
->get_nested_column();
const auto* new_null_map = null_map_column.get_data().data();
if (null_map_column.size() != output_column->size()) {
return Status::InternalError(
"null_map_column size mismatch output_column_size, "
"null_map_column_size={}, output_column_size={}; input_column={}",
null_map_column.size(), output_column->size(), input_column->get_name());
}
if (nested_column.size() != output_column->size()) {
return Status::InternalError(
"nested_column size is changed, nested_column_size={}, "
"ouput_column_size={}; input_column={}",
nested_column.size(), output_column->size(), input_column->get_name());
}
bool is_changed = false;
for (size_t i = 0; i < input_column->size(); i++) {
is_changed |= new_null_map[i];
}
if (is_changed) {
return Status::DataQualityError(
"some not null data is changed to null, intput_column={}",
input_column->get_name());
}
}
}
if (input_column->is_nullable() && output_column->is_nullable()) {
const auto* ref_null_map =
vectorized::check_and_get_column<vectorized::ColumnNullable>(input_column.get())
->get_null_map_column()
.get_data()
.data();
const auto* new_null_map =
vectorized::check_and_get_column<vectorized::ColumnNullable>(output_column.get())
->get_null_map_column()
.get_data()
.data();
bool is_changed = false;
for (size_t i = 0; i < input_column->size(); i++) {
is_changed |= (ref_null_map[i] != new_null_map[i]);
}
if (is_changed) {
return Status::DataQualityError(
"null map is changed after calculation, input_column={}",
input_column->get_name());
}
}
return Status::OK();
}
Status LinkedSchemaChange::process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
BaseTabletSPtr new_tablet, BaseTabletSPtr base_tablet,
TabletSchemaSPtr base_tablet_schema,
TabletSchemaSPtr new_tablet_schema) {
Status status = rowset_writer->add_rowset_for_linked_schema_change(rowset_reader->rowset());
if (!status) {
LOG(WARNING) << "fail to convert rowset."
<< ", new_tablet=" << new_tablet->tablet_id()
<< ", version=" << rowset_writer->version().first << "-"
<< rowset_writer->version().second << ", error status " << status;
return status;
}
// copy delete bitmap to new tablet.
if (new_tablet->keys_type() == UNIQUE_KEYS && new_tablet->enable_unique_key_merge_on_write()) {
DeleteBitmap origin_delete_bitmap(base_tablet->tablet_id());
base_tablet->tablet_meta()->delete_bitmap().subset(
{rowset_reader->rowset()->rowset_id(), 0, 0},
{rowset_reader->rowset()->rowset_id(), UINT32_MAX, INT64_MAX},
&origin_delete_bitmap);
for (auto& iter : origin_delete_bitmap.delete_bitmap) {
int ret = new_tablet->tablet_meta()->delete_bitmap().set(
{rowset_writer->rowset_id(), std::get<1>(iter.first), std::get<2>(iter.first)},
iter.second);
DCHECK(ret == 1);
}
}
return Status::OK();
}
Status VSchemaChangeDirectly::_inner_process(RowsetReaderSharedPtr rowset_reader,
RowsetWriter* rowset_writer, BaseTabletSPtr new_tablet,
TabletSchemaSPtr base_tablet_schema,
TabletSchemaSPtr new_tablet_schema) {
bool eof = false;
do {
auto new_block = vectorized::Block::create_unique(new_tablet_schema->create_block());
auto ref_block = vectorized::Block::create_unique(base_tablet_schema->create_block());
auto st = rowset_reader->next_block(ref_block.get());
if (!st) {
if (st.is<ErrorCode::END_OF_FILE>()) {
if (ref_block->rows() == 0) {
break;
} else {
eof = true;
}
} else {
return st;
}
}
RETURN_IF_ERROR(_changer.change_block(ref_block.get(), new_block.get()));
RETURN_IF_ERROR(rowset_writer->add_block(new_block.get()));
} while (!eof);
RETURN_IF_ERROR(rowset_writer->flush());
return Status::OK();
}
VBaseSchemaChangeWithSorting::VBaseSchemaChangeWithSorting(const BlockChanger& changer,
size_t memory_limitation)
: _changer(changer),
_memory_limitation(memory_limitation),
_temp_delta_versions(Version::mock()) {
_mem_tracker = std::make_unique<MemTracker>(
fmt::format("VSchemaChangeWithSorting:changer={}", std::to_string(int64_t(&changer))));
}
Status VBaseSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_reader,
RowsetWriter* rowset_writer,
BaseTabletSPtr new_tablet,
TabletSchemaSPtr base_tablet_schema,
TabletSchemaSPtr new_tablet_schema) {
// for internal sorting
std::vector<std::unique_ptr<vectorized::Block>> blocks;
RowsetSharedPtr rowset = rowset_reader->rowset();
SegmentsOverlapPB segments_overlap = rowset->rowset_meta()->segments_overlap();
int64_t newest_write_timestamp = rowset->newest_write_timestamp();
_temp_delta_versions.first = _temp_delta_versions.second;
_src_rowsets.clear(); // init _src_rowsets
auto create_rowset = [&]() -> Status {
if (blocks.empty()) {
return Status::OK();
}
auto rowset = DORIS_TRY(_internal_sorting(
blocks, Version(_temp_delta_versions.second, _temp_delta_versions.second + 1),
newest_write_timestamp, new_tablet, BETA_ROWSET, segments_overlap,
new_tablet_schema));
_src_rowsets.push_back(std::move(rowset));
for (auto& block : blocks) {
_mem_tracker->release(block->allocated_bytes());
}
blocks.clear();
// increase temp version
_temp_delta_versions.second += 2;
return Status::OK();
};
auto new_block = vectorized::Block::create_unique(new_tablet_schema->create_block());
bool eof = false;
do {
auto ref_block = vectorized::Block::create_unique(base_tablet_schema->create_block());
auto st = rowset_reader->next_block(ref_block.get());
if (!st) {
if (st.is<ErrorCode::END_OF_FILE>()) {
if (ref_block->rows() == 0) {
break;
} else {
eof = true;
}
} else {
return st;
}
}
RETURN_IF_ERROR(_changer.change_block(ref_block.get(), new_block.get()));
constexpr double HOLD_BLOCK_MEMORY_RATE =
0.66; // Reserve some memory for use by other parts of this job
if (_mem_tracker->consumption() + new_block->allocated_bytes() > _memory_limitation ||
_mem_tracker->consumption() > _memory_limitation * HOLD_BLOCK_MEMORY_RATE ||
DebugPoints::instance()->is_enable(
"VBaseSchemaChangeWithSorting._inner_process.create_rowset")) {
RETURN_IF_ERROR(create_rowset());
if (_mem_tracker->consumption() + new_block->allocated_bytes() > _memory_limitation) {
return Status::Error<INVALID_ARGUMENT>(
"Memory limitation is too small for Schema Change. _memory_limitation={}, "
"new_block->allocated_bytes()={}, consumption={}",
_memory_limitation, new_block->allocated_bytes(),
_mem_tracker->consumption());
}
}
_mem_tracker->consume(new_block->allocated_bytes());
// move unique ptr
blocks.push_back(vectorized::Block::create_unique(new_tablet_schema->create_block()));
swap(blocks.back(), new_block);
} while (!eof);
RETURN_IF_ERROR(create_rowset());
if (_src_rowsets.empty()) {
RETURN_IF_ERROR(rowset_writer->flush());
} else {
RETURN_IF_ERROR(
_external_sorting(_src_rowsets, rowset_writer, new_tablet, new_tablet_schema));
}
return Status::OK();
}
Result<RowsetSharedPtr> VBaseSchemaChangeWithSorting::_internal_sorting(
const std::vector<std::unique_ptr<vectorized::Block>>& blocks, const Version& version,
int64_t newest_write_timestamp, BaseTabletSPtr new_tablet, RowsetTypePB new_rowset_type,
SegmentsOverlapPB segments_overlap, TabletSchemaSPtr new_tablet_schema) {
uint64_t merged_rows = 0;
MultiBlockMerger merger(new_tablet);
RowsetWriterContext context;
context.version = version;
context.rowset_state = VISIBLE;
context.segments_overlap = segments_overlap;
context.tablet_schema = new_tablet_schema;
context.newest_write_timestamp = newest_write_timestamp;
context.write_type = DataWriteType::TYPE_SCHEMA_CHANGE;
std::unique_ptr<RowsetWriter> rowset_writer;
// TODO(plat1ko): Use monad op
if (auto result = new_tablet->create_rowset_writer(context, false); !result.has_value())
[[unlikely]] {
return unexpected(std::move(result).error());
} else {
rowset_writer = std::move(result).value();
}
RETURN_IF_ERROR_RESULT(merger.merge(blocks, rowset_writer.get(), &merged_rows));
_add_merged_rows(merged_rows);
RowsetSharedPtr rowset;
RETURN_IF_ERROR_RESULT(rowset_writer->build(rowset));
return rowset;
}
Result<RowsetSharedPtr> VLocalSchemaChangeWithSorting::_internal_sorting(
const std::vector<std::unique_ptr<vectorized::Block>>& blocks, const Version& version,
int64_t newest_write_timestamp, BaseTabletSPtr new_tablet, RowsetTypePB new_rowset_type,
SegmentsOverlapPB segments_overlap, TabletSchemaSPtr new_tablet_schema) {
uint64_t merged_rows = 0;
MultiBlockMerger merger(new_tablet);
RowsetWriterContext context;
context.version = version;
context.rowset_state = VISIBLE;
context.segments_overlap = segments_overlap;
context.tablet_schema = new_tablet_schema;
context.newest_write_timestamp = newest_write_timestamp;
context.write_type = DataWriteType::TYPE_SCHEMA_CHANGE;
std::unique_ptr<RowsetWriter> rowset_writer;
// TODO(plat1ko): Use monad op
if (auto result = new_tablet->create_rowset_writer(context, false); !result.has_value())
[[unlikely]] {
return unexpected(std::move(result).error());
} else {
rowset_writer = std::move(result).value();
}
auto guard = _local_storage_engine.pending_local_rowsets().add(context.rowset_id);
_pending_rs_guards.push_back(std::move(guard));
RETURN_IF_ERROR_RESULT(merger.merge(blocks, rowset_writer.get(), &merged_rows));
_add_merged_rows(merged_rows);
RowsetSharedPtr rowset;
RETURN_IF_ERROR_RESULT(rowset_writer->build(rowset));
return rowset;
}
Status VBaseSchemaChangeWithSorting::_external_sorting(std::vector<RowsetSharedPtr>& src_rowsets,
RowsetWriter* rowset_writer,
BaseTabletSPtr new_tablet,
TabletSchemaSPtr new_tablet_schema) {
std::vector<RowsetReaderSharedPtr> rs_readers;
for (auto& rowset : src_rowsets) {
RowsetReaderSharedPtr rs_reader;
RETURN_IF_ERROR(rowset->create_reader(&rs_reader));
rs_readers.push_back(rs_reader);
}
Merger::Statistics stats;
if (!new_tablet_schema->cluster_key_uids().empty()) {
// schema change read rowsets with delete bitmap, so there should be no duplicated keys
// RETURN_IF_ERROR(Compaction::update_delete_bitmap());
int64_t way_num = 0;
int64_t input_rowsets_data_size = 0;
int64_t input_row_num = 0;
for (auto& rowset : src_rowsets) {
way_num += rowset->rowset_meta()->get_merge_way_num();
input_rowsets_data_size += rowset->data_disk_size();
input_row_num += rowset->num_rows();
}
int64_t avg_segment_rows = config::vertical_compaction_max_segment_size /
(input_rowsets_data_size / (input_row_num + 1) + 1);
RETURN_IF_ERROR(Merger::vertical_merge_rowsets(
new_tablet, ReaderType::READER_ALTER_TABLE, *new_tablet_schema, rs_readers,
rowset_writer, avg_segment_rows, way_num, &stats));
} else {
RETURN_IF_ERROR(Merger::vmerge_rowsets(new_tablet, ReaderType::READER_ALTER_TABLE,
*new_tablet_schema, rs_readers, rowset_writer,
&stats));
}
_add_merged_rows(stats.merged_rows);
_add_filtered_rows(stats.filtered_rows);
return Status::OK();
}
Status VLocalSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_reader,
RowsetWriter* rowset_writer,
BaseTabletSPtr new_tablet,
TabletSchemaSPtr base_tablet_schema,
TabletSchemaSPtr new_tablet_schema) {
Defer defer {[&]() {
// remove the intermediate rowsets generated by internal sorting
for (auto& row_set : _src_rowsets) {
_local_storage_engine.add_unused_rowset(row_set);
}
}};
_pending_rs_guards.clear();
return VBaseSchemaChangeWithSorting::_inner_process(rowset_reader, rowset_writer, new_tablet,
base_tablet_schema, new_tablet_schema);
}
Status SchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& request) {
if (!request.__isset.desc_tbl) {
return Status::Error<INVALID_ARGUMENT>(
"desc_tbl is not set. Maybe the FE version is not equal to the BE "
"version.");
}
if (_base_tablet == nullptr) {
return Status::Error<TABLE_NOT_FOUND>("fail to find base tablet. base_tablet={}",
request.base_tablet_id);
}
if (_new_tablet == nullptr) {
return Status::Error<TABLE_NOT_FOUND>("fail to find new tablet. new_tablet={}",
request.new_tablet_id);
}
LOG(INFO) << "begin to do request alter tablet: base_tablet_id=" << request.base_tablet_id
<< ", new_tablet_id=" << request.new_tablet_id
<< ", alter_version=" << request.alter_version;
// Lock schema_change_lock util schema change info is stored in tablet header
static constexpr long TRY_LOCK_TIMEOUT = 30;
std::unique_lock schema_change_lock(_base_tablet->get_schema_change_lock(), std::defer_lock);
bool owns_lock = schema_change_lock.try_lock_for(std::chrono::seconds(TRY_LOCK_TIMEOUT));
if (!owns_lock) {
return Status::Error<TRY_LOCK_FAILED>(
"Failed to obtain schema change lock, there might be inverted index being "
"built or cooldown runnning on base_tablet={}",
request.base_tablet_id);
}
Status res = _do_process_alter_tablet(request);
LOG(INFO) << "finished alter tablet process, res=" << res;
DBUG_EXECUTE_IF("SchemaChangeJob::process_alter_tablet.leave.sleep", { sleep(5); });
return res;
}
SchemaChangeJob::SchemaChangeJob(StorageEngine& local_storage_engine,
const TAlterTabletReqV2& request, const std::string& job_id)
: _local_storage_engine(local_storage_engine) {
_base_tablet = _local_storage_engine.tablet_manager()->get_tablet(request.base_tablet_id);
_new_tablet = _local_storage_engine.tablet_manager()->get_tablet(request.new_tablet_id);
if (_base_tablet && _new_tablet) {
_base_tablet_schema = std::make_shared<TabletSchema>();
_base_tablet_schema->update_tablet_columns(*_base_tablet->tablet_schema(), request.columns);
// The request only include column info, do not include bitmap or bloomfilter index info,
// So we also need to copy index info from the real base tablet
_base_tablet_schema->update_index_info_from(*_base_tablet->tablet_schema());
// During a schema change, the extracted columns of a variant should not be included in the tablet schema.
// This is because the schema change for a variant needs to ignore the extracted columns.
// Otherwise, the schema types in different rowsets might be inconsistent. When performing a schema change,
// the complete variant is constructed by reading all the sub-columns of the variant.
_new_tablet_schema = _new_tablet->tablet_schema()->copy_without_variant_extracted_columns();
}
_job_id = job_id;
}
// In the past schema change and rollup will create new tablet and will wait for txns starting before the task to finished
// It will cost a lot of time to wait and the task is very difficult to understand.
// In alter task v2, FE will call BE to create tablet and send an alter task to BE to convert historical data.
// The admin should upgrade all BE and then upgrade FE.
// Should delete the old code after upgrade finished.
Status SchemaChangeJob::_do_process_alter_tablet(const TAlterTabletReqV2& request) {
DBUG_EXECUTE_IF("SchemaChangeJob._do_process_alter_tablet.sleep", { sleep(10); })
Status res;
signal::tablet_id = _base_tablet->get_table_id();
// check if tablet's state is not_ready, if it is ready, it means the tablet already finished
// check whether the tablet's max continuous version == request.version
if (_new_tablet->tablet_state() != TABLET_NOTREADY) {
res = _validate_alter_result(request);
LOG(INFO) << "tablet's state=" << _new_tablet->tablet_state()
<< " the convert job already finished, check its version"
<< " res=" << res;
return res;
}
_new_tablet->set_alter_failed(false);
Defer defer([this] {
// if tablet state is not TABLET_RUNNING when return, indicates that alter has failed.
if (_new_tablet->tablet_state() != TABLET_RUNNING) {
_new_tablet->set_alter_failed(true);
}
});
LOG(INFO) << "finish to validate alter tablet request. begin to convert data from base tablet "
"to new tablet"
<< " base_tablet=" << _base_tablet->tablet_id()
<< " new_tablet=" << _new_tablet->tablet_id();
std::shared_lock base_migration_rlock(_base_tablet->get_migration_lock(), std::try_to_lock);
if (!base_migration_rlock.owns_lock()) {
return Status::Error<TRY_LOCK_FAILED>(
"SchemaChangeJob::_do_process_alter_tablet get lock failed");
}
std::shared_lock new_migration_rlock(_new_tablet->get_migration_lock(), std::try_to_lock);
if (!new_migration_rlock.owns_lock()) {
return Status::Error<TRY_LOCK_FAILED>(
"SchemaChangeJob::_do_process_alter_tablet get lock failed");
}
std::vector<Version> versions_to_be_changed;
int64_t end_version = -1;
// reader_context is stack variables, it's lifetime should keep the same
// with rs_readers
RowsetReaderContext reader_context;
std::vector<RowSetSplits> rs_splits;
// delete handlers for new tablet
DeleteHandler delete_handler;
std::vector<ColumnId> return_columns;
// Use tablet schema directly from base tablet, they are the newest schema, not contain
// dropped column during light weight schema change.
// But the tablet schema in base tablet maybe not the latest from FE, so that if fe pass through
// a tablet schema, then use request schema.
size_t num_cols =
request.columns.empty() ? _base_tablet_schema->num_columns() : request.columns.size();
return_columns.resize(num_cols);
for (int i = 0; i < num_cols; ++i) {
return_columns[i] = i;
}
std::vector<uint32_t> cluster_key_idxes;
DBUG_EXECUTE_IF("SchemaChangeJob::_do_process_alter_tablet.block", DBUG_BLOCK);
// begin to find deltas to convert from base tablet to new tablet so that
// obtain base tablet and new tablet's push lock and header write lock to prevent loading data
{
std::lock_guard base_tablet_lock(_base_tablet->get_push_lock());
std::lock_guard new_tablet_lock(_new_tablet->get_push_lock());
std::lock_guard base_tablet_wlock(_base_tablet->get_header_lock());
SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
std::lock_guard<std::shared_mutex> new_tablet_wlock(_new_tablet->get_header_lock());
do {
RowsetSharedPtr max_rowset;
// get history data to be converted and it will check if there is hold in base tablet
res = _get_versions_to_be_changed(&versions_to_be_changed, &max_rowset);
if (!res) {
LOG(WARNING) << "fail to get version to be changed. res=" << res;
break;
}
DBUG_EXECUTE_IF("SchemaChangeJob.process_alter_tablet.alter_fail", {
res = Status::InternalError(
"inject alter tablet failed. base_tablet={}, new_tablet={}",
request.base_tablet_id, request.new_tablet_id);
LOG(WARNING) << "inject error. res=" << res;
break;
});
// should check the max_version >= request.alter_version, if not the convert is useless
if (max_rowset == nullptr || max_rowset->end_version() < request.alter_version) {
res = Status::InternalError(
"base tablet's max version={} is less than request version={}",
(max_rowset == nullptr ? 0 : max_rowset->end_version()),
request.alter_version);
break;
}
// before calculating version_to_be_changed,
// remove all data from new tablet, prevent to rewrite data(those double pushed when wait)
LOG(INFO) << "begin to remove all data before end version from new tablet to prevent "
"rewrite."
<< " new_tablet=" << _new_tablet->tablet_id()
<< ", end_version=" << max_rowset->end_version();
std::vector<RowsetSharedPtr> rowsets_to_delete;
std::vector<std::pair<Version, RowsetSharedPtr>> version_rowsets;
_new_tablet->acquire_version_and_rowsets(&version_rowsets);
std::sort(version_rowsets.begin(), version_rowsets.end(),
[](const std::pair<Version, RowsetSharedPtr>& l,
const std::pair<Version, RowsetSharedPtr>& r) {
return l.first.first < r.first.first;
});
for (auto& pair : version_rowsets) {
if (pair.first.second <= max_rowset->end_version()) {
rowsets_to_delete.push_back(pair.second);
} else if (pair.first.first <= max_rowset->end_version()) {
// If max version is [X-10] and new tablet has version [7-9][10-12],
// we only can remove [7-9] from new tablet. If we add [X-10] to new tablet, it will has version
// cross: [X-10] [10-12].
// So, we should return OLAP_ERR_VERSION_ALREADY_MERGED for fast fail.
return Status::Error<VERSION_ALREADY_MERGED>(
"New tablet has a version {} crossing base tablet's max_version={}",
pair.first.to_string(), max_rowset->end_version());
}
}
std::vector<RowsetSharedPtr> empty_vec;
RETURN_IF_ERROR(_new_tablet->delete_rowsets(rowsets_to_delete, false));
// inherit cumulative_layer_point from base_tablet
// check if new_tablet.ce_point > base_tablet.ce_point?
_new_tablet->set_cumulative_layer_point(-1);
// save tablet meta
_new_tablet->save_meta();
for (auto& rowset : rowsets_to_delete) {
// do not call rowset.remove directly, using gc thread to delete it
_local_storage_engine.add_unused_rowset(rowset);
}
// init one delete handler
for (auto& version : versions_to_be_changed) {
end_version = std::max(end_version, version.second);
}
// acquire data sources correspond to history versions
RETURN_IF_ERROR(
_base_tablet->capture_rs_readers_unlocked(versions_to_be_changed, &rs_splits));
if (rs_splits.empty()) {
res = Status::Error<ALTER_DELTA_DOES_NOT_EXISTS>(
"fail to acquire all data sources. version_num={}, data_source_num={}",
versions_to_be_changed.size(), rs_splits.size());
break;
}
std::vector<RowsetMetaSharedPtr> del_preds;
for (auto&& split : rs_splits) {
const auto& rs_meta = split.rs_reader->rowset()->rowset_meta();
if (!rs_meta->has_delete_predicate() || rs_meta->start_version() > end_version) {
continue;
}
_base_tablet_schema->merge_dropped_columns(*rs_meta->tablet_schema());
del_preds.push_back(rs_meta);
}
res = delete_handler.init(_base_tablet_schema, del_preds, end_version);
if (!res) {
LOG(WARNING) << "init delete handler failed. base_tablet="
<< _base_tablet->tablet_id() << ", end_version=" << end_version;
break;
}
reader_context.reader_type = ReaderType::READER_ALTER_TABLE;
reader_context.tablet_schema = _base_tablet_schema;
reader_context.need_ordered_result = true;
reader_context.delete_handler = &delete_handler;
reader_context.return_columns = &return_columns;
reader_context.sequence_id_idx = reader_context.tablet_schema->sequence_col_idx();
reader_context.is_unique = _base_tablet->keys_type() == UNIQUE_KEYS;
reader_context.batch_size = ALTER_TABLE_BATCH_SIZE;
reader_context.delete_bitmap = &_base_tablet->tablet_meta()->delete_bitmap();
reader_context.version = Version(0, end_version);
if (!_base_tablet_schema->cluster_key_uids().empty()) {
for (const auto& uid : _base_tablet_schema->cluster_key_uids()) {
cluster_key_idxes.emplace_back(_base_tablet_schema->field_index(uid));
}
reader_context.read_orderby_key_columns = &cluster_key_idxes;
reader_context.is_unique = false;
reader_context.sequence_id_idx = -1;
}
for (auto& rs_split : rs_splits) {
res = rs_split.rs_reader->init(&reader_context);
if (!res) {
LOG(WARNING) << "failed to init rowset reader: " << _base_tablet->tablet_id();
break;
}
}
} while (false);
}
do {
if (!res) {
break;
}
SchemaChangeParams sc_params;
RETURN_IF_ERROR(
DescriptorTbl::create(&sc_params.pool, request.desc_tbl, &sc_params.desc_tbl));
sc_params.ref_rowset_readers.reserve(rs_splits.size());
for (RowSetSplits& split : rs_splits) {
sc_params.ref_rowset_readers.emplace_back(split.rs_reader);
}
sc_params.delete_handler = &delete_handler;
sc_params.be_exec_version = request.be_exec_version;
DCHECK(request.__isset.alter_tablet_type);
switch (request.alter_tablet_type) {
case TAlterTabletType::SCHEMA_CHANGE:
sc_params.alter_tablet_type = AlterTabletType::SCHEMA_CHANGE;
break;
case TAlterTabletType::ROLLUP:
sc_params.alter_tablet_type = AlterTabletType::ROLLUP;
break;
case TAlterTabletType::MIGRATION:
sc_params.alter_tablet_type = AlterTabletType::MIGRATION;
break;
}
if (request.__isset.materialized_view_params) {
for (auto item : request.materialized_view_params) {
AlterMaterializedViewParam mv_param;
mv_param.column_name = item.column_name;
if (item.__isset.mv_expr) {
mv_param.expr = std::make_shared<TExpr>(item.mv_expr);
}
sc_params.materialized_params_map.insert(
std::make_pair(to_lower(item.column_name), mv_param));
}
}
{
std::lock_guard<std::shared_mutex> wrlock(_mutex);
_tablet_ids_in_converting.insert(_new_tablet->tablet_id());
}
int64_t real_alter_version = 0;
sc_params.enable_unique_key_merge_on_write =
_new_tablet->enable_unique_key_merge_on_write();
res = _convert_historical_rowsets(sc_params, &real_alter_version);
{
std::lock_guard<std::shared_mutex> wrlock(_mutex);
_tablet_ids_in_converting.erase(_new_tablet->tablet_id());
}
if (!res) {
break;
}
DCHECK_GE(real_alter_version, request.alter_version);
if (_new_tablet->keys_type() == UNIQUE_KEYS &&
_new_tablet->enable_unique_key_merge_on_write()) {
res = _calc_delete_bitmap_for_mow_table(real_alter_version);
if (!res) {
break;
}
} else {
// set state to ready
std::lock_guard<std::shared_mutex> new_wlock(_new_tablet->get_header_lock());
SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
res = _new_tablet->set_tablet_state(TabletState::TABLET_RUNNING);
if (!res) {
break;
}
_new_tablet->save_meta();
}
} while (false);
if (res) {
// _validate_alter_result should be outside the above while loop.
// to avoid requiring the header lock twice.
res = _validate_alter_result(request);
}
// if failed convert history data, then just remove the new tablet
if (!res) {
LOG(WARNING) << "failed to alter tablet. base_tablet=" << _base_tablet->tablet_id()
<< ", drop new_tablet=" << _new_tablet->tablet_id();
// do not drop the new tablet and its data. GC thread will
}
return res;
}
bool SchemaChangeJob::tablet_in_converting(int64_t tablet_id) {
std::shared_lock rdlock(_mutex);
return _tablet_ids_in_converting.find(tablet_id) != _tablet_ids_in_converting.end();
}
Status SchemaChangeJob::_get_versions_to_be_changed(std::vector<Version>* versions_to_be_changed,
RowsetSharedPtr* max_rowset) {
RowsetSharedPtr rowset = _base_tablet->get_rowset_with_max_version();
if (rowset == nullptr) {
return Status::Error<ALTER_DELTA_DOES_NOT_EXISTS>("Tablet has no version. base_tablet={}",
_base_tablet->tablet_id());
}
*max_rowset = rowset;
RETURN_IF_ERROR(_base_tablet->capture_consistent_versions_unlocked(
Version(0, rowset->version().second), versions_to_be_changed, false, false));
return Status::OK();
}
// The `real_alter_version` parameter indicates that the version of [0-real_alter_version] is
// converted from a base tablet, only used for the mow table now.
Status SchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParams& sc_params,
int64_t* real_alter_version) {
LOG(INFO) << "begin to convert historical rowsets for new_tablet from base_tablet."
<< " base_tablet=" << _base_tablet->tablet_id()
<< ", new_tablet=" << _new_tablet->tablet_id() << ", job_id=" << _job_id;
// find end version
int32_t end_version = -1;
for (const auto& ref_rowset_reader : sc_params.ref_rowset_readers) {
if (ref_rowset_reader->version().second > end_version) {
end_version = ref_rowset_reader->version().second;
}
}
// Add filter information in change, and filter column information will be set in parse_request
// And filter some data every time the row block changes
BlockChanger changer(_new_tablet_schema, *sc_params.desc_tbl);
bool sc_sorting = false;
bool sc_directly = false;
// a.Parse the Alter request and convert it into an internal representation
Status res = parse_request(sc_params, _base_tablet_schema.get(), _new_tablet_schema.get(),
&changer, &sc_sorting, &sc_directly);
LOG(INFO) << "schema change type, sc_sorting: " << sc_sorting
<< ", sc_directly: " << sc_directly << ", base_tablet=" << _base_tablet->tablet_id()
<< ", new_tablet=" << _new_tablet->tablet_id();
auto process_alter_exit = [&]() -> Status {
{
// save tablet meta here because rowset meta is not saved during add rowset
std::lock_guard new_wlock(_new_tablet->get_header_lock());
SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
_new_tablet->save_meta();
}
if (res) {
Version test_version(0, end_version);
res = _new_tablet->check_version_integrity(test_version);
}
LOG(INFO) << "finish converting rowsets for new_tablet from base_tablet. "
<< "base_tablet=" << _base_tablet->tablet_id()
<< ", new_tablet=" << _new_tablet->tablet_id();
return res;
};
if (!res) {
LOG(WARNING) << "failed to parse the request. res=" << res;
return process_alter_exit();
}
if (!sc_sorting && !sc_directly && sc_params.alter_tablet_type == AlterTabletType::ROLLUP) {
res = Status::Error<SCHEMA_SCHEMA_INVALID>(
"Don't support to add materialized view by linked schema change");
return process_alter_exit();
}
// b. Generate historical data converter
auto sc_procedure = _get_sc_procedure(
changer, sc_sorting, sc_directly,
_local_storage_engine.memory_limitation_bytes_per_thread_for_schema_change());
DBUG_EXECUTE_IF("SchemaChangeJob::_convert_historical_rowsets.block", DBUG_BLOCK);
// c.Convert historical data
bool have_failure_rowset = false;
for (const auto& rs_reader : sc_params.ref_rowset_readers) {
// set status for monitor
// As long as there is a new_table as running, ref table is set as running
// NOTE If the first sub_table fails first, it will continue to go as normal here
// When tablet create new rowset writer, it may change rowset type, in this case
// linked schema change will not be used.
RowsetWriterContext context;
context.version = rs_reader->version();
context.rowset_state = VISIBLE;
context.segments_overlap = rs_reader->rowset()->rowset_meta()->segments_overlap();
context.tablet_schema = _new_tablet_schema;
context.newest_write_timestamp = rs_reader->newest_write_timestamp();
if (!rs_reader->rowset()->is_local()) {
context.storage_resource =
*DORIS_TRY(rs_reader->rowset()->rowset_meta()->remote_storage_resource());
}
context.write_type = DataWriteType::TYPE_SCHEMA_CHANGE;
// TODO if support VerticalSegmentWriter, also need to handle cluster key primary key index
bool vertical = false;
if (sc_sorting && !_new_tablet->tablet_schema()->cluster_key_uids().empty()) {
// see VBaseSchemaChangeWithSorting::_external_sorting
vertical = true;
}
auto result = _new_tablet->create_rowset_writer(context, vertical);
if (!result.has_value()) {
res = Status::Error<ROWSET_BUILDER_INIT>("create_rowset_writer failed, reason={}",
result.error().to_string());
return process_alter_exit();
}
auto rowset_writer = std::move(result).value();
auto pending_rs_guard = _local_storage_engine.add_pending_rowset(context);
if (res = sc_procedure->process(rs_reader, rowset_writer.get(), _new_tablet, _base_tablet,
_base_tablet_schema, _new_tablet_schema);
!res) {
LOG(WARNING) << "failed to process the version."
<< " version=" << rs_reader->version().first << "-"
<< rs_reader->version().second << ", " << res.to_string();
return process_alter_exit();
}
// Add the new version of the data to the header
// In order to prevent the occurrence of deadlock, we must first lock the old table, and then lock the new table
std::lock_guard lock(_new_tablet->get_push_lock());
RowsetSharedPtr new_rowset;
if (!(res = rowset_writer->build(new_rowset)).ok()) {
LOG(WARNING) << "failed to build rowset, exit alter process";
return process_alter_exit();
}
res = _new_tablet->add_rowset(new_rowset);
if (res.is<PUSH_VERSION_ALREADY_EXIST>()) {
LOG(WARNING) << "version already exist, version revert occurred. "
<< "tablet=" << _new_tablet->tablet_id() << ", version='"
<< rs_reader->version().first << "-" << rs_reader->version().second;
_local_storage_engine.add_unused_rowset(new_rowset);
have_failure_rowset = true;
res = Status::OK();
} else if (!res) {
LOG(WARNING) << "failed to register new version. "
<< " tablet=" << _new_tablet->tablet_id()
<< ", version=" << rs_reader->version().first << "-"
<< rs_reader->version().second;
_local_storage_engine.add_unused_rowset(new_rowset);
return process_alter_exit();
} else {
VLOG_NOTICE << "register new version. tablet=" << _new_tablet->tablet_id()
<< ", version=" << rs_reader->version().first << "-"
<< rs_reader->version().second;
}
if (!have_failure_rowset) {
*real_alter_version = rs_reader->version().second;
}
VLOG_TRACE << "succeed to convert a history version."
<< " version=" << rs_reader->version().first << "-"
<< rs_reader->version().second;
}
// XXX:The SchemaChange state should not be canceled at this time, because the new Delta has to be converted to the old and new Schema version
return process_alter_exit();
}
static const std::string WHERE_SIGN_LOWER = to_lower("__DORIS_WHERE_SIGN__");
// @static
// Analyze the mapping of the column and the mapping of the filter key
Status SchemaChangeJob::parse_request(const SchemaChangeParams& sc_params,
TabletSchema* base_tablet_schema,
TabletSchema* new_tablet_schema, BlockChanger* changer,
bool* sc_sorting, bool* sc_directly) {
changer->set_type(sc_params.alter_tablet_type);
changer->set_compatible_version(sc_params.be_exec_version);
const std::unordered_map<std::string, AlterMaterializedViewParam>& materialized_function_map =
sc_params.materialized_params_map;
DescriptorTbl desc_tbl = *sc_params.desc_tbl;
// set column mapping
for (int i = 0, new_schema_size = new_tablet_schema->num_columns(); i < new_schema_size; ++i) {
const TabletColumn& new_column = new_tablet_schema->column(i);
const std::string& column_name_lower = to_lower(new_column.name());
ColumnMapping* column_mapping = changer->get_mutable_column_mapping(i);
column_mapping->new_column = &new_column;
column_mapping->ref_column_idx = base_tablet_schema->field_index(new_column.name());
if (materialized_function_map.find(column_name_lower) != materialized_function_map.end()) {
auto mv_param = materialized_function_map.find(column_name_lower)->second;
column_mapping->expr = mv_param.expr;
if (column_mapping->expr != nullptr) {
continue;
}
}
if (column_mapping->ref_column_idx >= 0) {
continue;
}
if (sc_params.alter_tablet_type == ROLLUP) {
std::string materialized_function_map_str;
for (auto str : materialized_function_map) {
if (!materialized_function_map_str.empty()) {
materialized_function_map_str += ',';
}
materialized_function_map_str += str.first;
}
return Status::InternalError(
"referenced column was missing. [column={},materialized_function_map={}]",
new_column.name(), materialized_function_map_str);
}
if (new_column.name().find("__doris_shadow_") == 0) {
// Should delete in the future, just a protection for bug.
LOG(INFO) << "a shadow column is encountered " << new_column.name();
return Status::InternalError("failed due to operate on shadow column");
}
// Newly added column go here
column_mapping->ref_column_idx = -1;
if (i < base_tablet_schema->num_short_key_columns()) {
*sc_directly = true;
}
RETURN_IF_ERROR(
_init_column_mapping(column_mapping, new_column, new_column.default_value()));
LOG(INFO) << "A column with default value will be added after schema changing. "
<< "column=" << new_column.name()
<< ", default_value=" << new_column.default_value();
}
if (materialized_function_map.contains(WHERE_SIGN_LOWER)) {
changer->set_where_expr(materialized_function_map.find(WHERE_SIGN_LOWER)->second.expr);
}
// If the reference sequence of the Key column is out of order, it needs to be reordered
int num_default_value = 0;
for (int i = 0, new_schema_size = new_tablet_schema->num_key_columns(); i < new_schema_size;
++i) {
ColumnMapping* column_mapping = changer->get_mutable_column_mapping(i);
if (!column_mapping->has_reference()) {
num_default_value++;
continue;
}
if (column_mapping->ref_column_idx != i - num_default_value) {
*sc_sorting = true;
return Status::OK();
}
}
if (base_tablet_schema->keys_type() != new_tablet_schema->keys_type()) {
// only when base table is dup and mv is agg
// the rollup job must be reagg.
*sc_sorting = true;
return Status::OK();
}
// If the sort of key has not been changed but the new keys num is less then base's,
// the new table should be re agg.
// So we also need to set sc_sorting = true.
// A, B, C are keys(sort keys), D is value
// followings need resort:
// old keys: A B C D
// new keys: A B
if (new_tablet_schema->keys_type() != KeysType::DUP_KEYS &&
new_tablet_schema->num_key_columns() < base_tablet_schema->num_key_columns()) {
// this is a table with aggregate key type, and num of key columns in new schema
// is less, which means the data in new tablet should be more aggregated.
// so we use sorting schema change to sort and merge the data.
*sc_sorting = true;
return Status::OK();
}
if (sc_params.alter_tablet_type == ROLLUP) {
*sc_directly = true;
return Status::OK();
}
if (sc_params.enable_unique_key_merge_on_write &&
new_tablet_schema->num_key_columns() > base_tablet_schema->num_key_columns()) {
*sc_directly = true;
return Status::OK();
}
if (base_tablet_schema->num_short_key_columns() != new_tablet_schema->num_short_key_columns()) {
// the number of short_keys changed, can't do linked schema change
*sc_directly = true;
return Status::OK();
}
if (!sc_params.delete_handler->empty()) {
// there exists delete condition in header, can't do linked schema change
*sc_directly = true;
return Status::OK();
}
// if new tablet enable row store, or new tablet has different row store columns
if ((!base_tablet_schema->exist_column(BeConsts::ROW_STORE_COL) &&
new_tablet_schema->exist_column(BeConsts::ROW_STORE_COL)) ||
!std::equal(new_tablet_schema->row_columns_uids().begin(),
new_tablet_schema->row_columns_uids().end(),
base_tablet_schema->row_columns_uids().begin(),
base_tablet_schema->row_columns_uids().end())) {
*sc_directly = true;
}
for (size_t i = 0; i < new_tablet_schema->num_columns(); ++i) {
ColumnMapping* column_mapping = changer->get_mutable_column_mapping(i);
if (column_mapping->expr != nullptr) {
*sc_directly = true;
return Status::OK();
} else if (column_mapping->ref_column_idx >= 0) {
// index changed
if (vectorized::schema_util::has_schema_index_diff(
new_tablet_schema, base_tablet_schema, i, column_mapping->ref_column_idx)) {
*sc_directly = true;
return Status::OK();
}
}
}
// if rs_reader has remote files, link schema change is not supported,
// use directly schema change instead.
if (!(*sc_directly) && !(*sc_sorting)) {
// check has remote rowset
// work for cloud and cold storage
for (const auto& rs_reader : sc_params.ref_rowset_readers) {
if (!rs_reader->rowset()->is_local()) {
*sc_directly = true;
break;
}
}
}
return Status::OK();
}
Status SchemaChangeJob::_init_column_mapping(ColumnMapping* column_mapping,
const TabletColumn& column_schema,
const std::string& value) {
if (auto field = WrapperField::create(column_schema); field.has_value()) {
column_mapping->default_value = field.value();
} else {
return field.error();
}
if (column_schema.is_nullable() && value.length() == 0) {
column_mapping->default_value->set_null();
} else {
RETURN_IF_ERROR(column_mapping->default_value->from_string(value, column_schema.precision(),
column_schema.frac()));
}
return Status::OK();
}
Status SchemaChangeJob::_validate_alter_result(const TAlterTabletReqV2& request) {
Version max_continuous_version = {-1, 0};
_new_tablet->max_continuous_version_from_beginning(&max_continuous_version);
LOG(INFO) << "find max continuous version of tablet=" << _new_tablet->tablet_id()
<< ", start_version=" << max_continuous_version.first
<< ", end_version=" << max_continuous_version.second;
if (max_continuous_version.second < request.alter_version) {
return Status::InternalError("result version={} is less than request version={}",
max_continuous_version.second, request.alter_version);
}
std::vector<std::pair<Version, RowsetSharedPtr>> version_rowsets;
{
std::shared_lock rdlock(_new_tablet->get_header_lock());
_new_tablet->acquire_version_and_rowsets(&version_rowsets);
}
for (auto& pair : version_rowsets) {
RowsetSharedPtr rowset = pair.second;
if (!rowset->check_file_exist()) {
return Status::Error<NOT_FOUND>(
"SchemaChangeJob::_validate_alter_result meet invalid rowset");
}
}
return Status::OK();
}
// For unique with merge-on-write table, should process delete bitmap here.
// 1. During double write, the newly imported rowsets does not calculate
// delete bitmap and publish successfully.
// 2. After conversion, calculate delete bitmap for the rowsets imported
// during double write. During this period, new data can still be imported
// witout calculating delete bitmap and publish successfully.
// 3. Block the new publish, calculate the delete bitmap of the
// incremental rowsets.
// 4. Switch the tablet status to TABLET_RUNNING. The newly imported
// data will calculate delete bitmap.
Status SchemaChangeJob::_calc_delete_bitmap_for_mow_table(int64_t alter_version) {
DBUG_EXECUTE_IF("SchemaChangeJob._calc_delete_bitmap_for_mow_table.random_failed", {
if (rand() % 100 < (100 * dp->param("percent", 0.1))) {
LOG_WARNING("SchemaChangeJob._calc_delete_bitmap_for_mow_table.random_failed");
return Status::InternalError("debug schema change calc delete bitmap random failed");
}
});
// can't do compaction when calc delete bitmap, if the rowset being calculated does
// a compaction, it may cause the delete bitmap to be missed.
std::lock_guard base_compaction_lock(_new_tablet->get_base_compaction_lock());
std::lock_guard cumu_compaction_lock(_new_tablet->get_cumulative_compaction_lock());
// step 2
int64_t max_version = _new_tablet->max_version().second;
std::vector<RowsetSharedPtr> rowsets;
if (alter_version < max_version) {
LOG(INFO) << "alter table for unique with merge-on-write, calculate delete bitmap of "
<< "double write rowsets for version: " << alter_version + 1 << "-" << max_version
<< " new_tablet=" << _new_tablet->tablet_id();
std::shared_lock rlock(_new_tablet->get_header_lock());
RETURN_IF_ERROR(_new_tablet->capture_consistent_rowsets_unlocked(
{alter_version + 1, max_version}, &rowsets));
}
for (auto rowset_ptr : rowsets) {
std::lock_guard rwlock(_new_tablet->get_rowset_update_lock());
std::shared_lock rlock(_new_tablet->get_header_lock());
RETURN_IF_ERROR(Tablet::update_delete_bitmap_without_lock(_new_tablet, rowset_ptr));
}
// step 3
std::lock_guard rwlock(_new_tablet->get_rowset_update_lock());
std::lock_guard new_wlock(_new_tablet->get_header_lock());
SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
int64_t new_max_version = _new_tablet->max_version_unlocked();
rowsets.clear();
if (max_version < new_max_version) {
LOG(INFO) << "alter table for unique with merge-on-write, calculate delete bitmap of "
<< "incremental rowsets for version: " << max_version + 1 << "-"
<< new_max_version << " new_tablet=" << _new_tablet->tablet_id();
RETURN_IF_ERROR(_new_tablet->capture_consistent_rowsets_unlocked(
{max_version + 1, new_max_version}, &rowsets));
}
for (auto&& rowset_ptr : rowsets) {
RETURN_IF_ERROR(Tablet::update_delete_bitmap_without_lock(_new_tablet, rowset_ptr));
}
// step 4
RETURN_IF_ERROR(_new_tablet->set_tablet_state(TabletState::TABLET_RUNNING));
_new_tablet->save_meta();
return Status::OK();
}
} // namespace doris