| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "cloud/cloud_index_change_compaction.h" |
| |
| #include <gtest/gtest.h> |
| |
| #include "cloud/cloud_base_compaction.h" |
| #include "cloud/cloud_cumulative_compaction.h" |
| #include "cpp/sync_point.h" |
| #include "json2pb/json_to_pb.h" |
| #include "olap/rowset/beta_rowset.h" |
| |
| namespace doris { |
| |
| class CloudIndexChangeCompactionTest : public testing::Test { |
| public: |
| void SetUp() { |
| _engine = std::make_unique<CloudStorageEngine>(EngineOptions {}); |
| auto sp = SyncPoint::get_instance(); |
| sp->enable_processing(); |
| } |
| |
| void TearDown() {} |
| |
| void init_rs_meta(RowsetMetaSharedPtr& pb1, int64_t start, int64_t end) { |
| std::string json_rowset_meta = R"({ |
| "rowset_id": 540085, |
| "tablet_id": 15674, |
| "partition_id": 10000, |
| "txn_id": 4045, |
| "tablet_schema_hash": 567997588, |
| "rowset_type": "BETA_ROWSET", |
| "rowset_state": "VISIBLE", |
| "start_version": 2, |
| "end_version": 2, |
| "num_rows": 3929, |
| "total_disk_size": 84699, |
| "data_disk_size": 84464, |
| "index_disk_size": 235, |
| "empty": false, |
| "load_id": { |
| "hi": -5350970832824939812, |
| "lo": -6717994719194512122 |
| }, |
| "creation_time": 1553765670 |
| })"; |
| |
| RowsetMetaPB rowset_meta_pb; |
| json2pb::JsonToProtoMessage(json_rowset_meta, &rowset_meta_pb); |
| rowset_meta_pb.set_start_version(start); |
| rowset_meta_pb.set_end_version(end); |
| rowset_meta_pb.set_creation_time(10000); |
| |
| pb1->init_from_pb(rowset_meta_pb); |
| } |
| |
| bool contains_str(std::string origin, std::string sub_str) { |
| return origin.find(sub_str) != std::string::npos; |
| } |
| |
| public: |
| std::unique_ptr<CloudStorageEngine> _engine; |
| }; |
| |
| TEST_F(CloudIndexChangeCompactionTest, ms_ret_status_test) { |
| std::vector<TOlapTableIndex> index_list; |
| TOlapTableIndex index; |
| index.index_id = 1; |
| index.columns.emplace_back("col1"); |
| index_list.push_back(index); |
| |
| TabletSchemaPB schema_pb; |
| schema_pb.set_keys_type(KeysType::DUP_KEYS); |
| |
| ColumnPB* column_pb = schema_pb.add_column(); |
| column_pb->set_unique_id(10000); |
| column_pb->set_name("col1"); |
| column_pb->set_type("int"); |
| column_pb->set_is_key(true); |
| column_pb->set_is_nullable(true); |
| auto tablet_schema = std::make_shared<TabletSchema>(); |
| tablet_schema->init_from_pb(schema_pb); |
| |
| auto rowset_meta = std::make_shared<RowsetMeta>(); |
| init_rs_meta(rowset_meta, 1, 1); |
| rowset_meta->set_num_segments(2); |
| RowsetSharedPtr rowset_ptr = std::make_shared<BetaRowset>(tablet_schema, rowset_meta, ""); |
| |
| TabletMetaPB input_meta_pb; |
| input_meta_pb.set_tablet_id(1000); |
| input_meta_pb.set_schema_hash(123456); |
| input_meta_pb.set_tablet_state(PB_RUNNING); |
| *input_meta_pb.mutable_tablet_uid() = TabletUid::gen_uid().to_proto(); |
| |
| auto tablet_meta = std::make_shared<TabletMeta>(); |
| tablet_meta->init_from_pb(input_meta_pb); |
| |
| CloudTabletSPtr tablet = std::make_shared<CloudTablet>(*_engine, tablet_meta); |
| |
| Version v1; |
| v1.first = 1; |
| v1.second = 2; |
| tablet->_rs_version_map[v1] = rowset_ptr; |
| |
| std::vector<TColumn> columns; |
| |
| { |
| Status ret_set = tablet->set_tablet_state(TabletState::TABLET_TOMBSTONED); |
| std::shared_ptr<CloudIndexChangeCompaction> index_change_compact = |
| std::make_shared<CloudIndexChangeCompaction>(*_engine, tablet, 0, index_list, |
| columns); |
| Status prepare_ret = index_change_compact->prepare_compact(); |
| ASSERT_TRUE(!prepare_ret.ok()); |
| ASSERT_TRUE(contains_str(prepare_ret.to_string(), "invalid tablet state")); |
| ASSERT_TRUE(ret_set.ok()); |
| } |
| |
| Status ret_set2 = tablet->set_tablet_state(TabletState::TABLET_RUNNING); |
| ASSERT_TRUE(ret_set2.ok()); |
| |
| // 1 test prepare |
| { |
| auto sp = SyncPoint::get_instance(); |
| sp->set_call_back("CloudMetaMgr::sync_tablet_rowsets", [](auto&& outcome) { |
| auto* pairs = try_any_cast_ret<Status>(outcome); |
| pairs->second = true; |
| pairs->first = Status::OK(); |
| }); |
| std::shared_ptr<CloudIndexChangeCompaction> index_change_compact = |
| std::make_shared<CloudIndexChangeCompaction>(*_engine, tablet, 0, index_list, |
| columns); |
| Status prepare_ret = index_change_compact->prepare_compact(); |
| ASSERT_TRUE(prepare_ret.ok()); |
| } |
| |
| { |
| auto sp = SyncPoint::get_instance(); |
| sp->set_call_back("CloudMetaMgr::sync_tablet_rowsets", [](auto&& outcome) { |
| auto* pairs = try_any_cast_ret<Status>(outcome); |
| pairs->second = true; |
| pairs->first = Status::InternalError<false>("mock sync meta failed"); |
| }); |
| |
| std::shared_ptr<CloudIndexChangeCompaction> index_change_compact = |
| std::make_shared<CloudIndexChangeCompaction>(*_engine, tablet, 0, index_list, |
| columns); |
| Status prepare_ret = index_change_compact->prepare_compact(); |
| ASSERT_TRUE(!prepare_ret.ok()); |
| ASSERT_TRUE(contains_str(prepare_ret.to_string(), "mock sync meta failed")); |
| } |
| |
| // 2 test request global lock |
| { |
| auto sp = SyncPoint::get_instance(); |
| sp->set_call_back("CloudMetaMgr::prepare_tablet_job", [](auto&& outcome) { |
| auto* pairs = try_any_cast_ret<Status>(outcome); |
| pairs->second = true; |
| pairs->first = Status::OK(); |
| }); |
| |
| std::shared_ptr<CloudIndexChangeCompaction> index_change_compact = |
| std::make_shared<CloudIndexChangeCompaction>(*_engine, tablet, 0, index_list, |
| columns); |
| index_change_compact->_input_rowsets.push_back(rowset_ptr); |
| bool should_skip_err = false; |
| Status ret = index_change_compact->request_global_lock(should_skip_err); |
| ASSERT_TRUE(ret.ok()); |
| ASSERT_TRUE(!should_skip_err); |
| } |
| |
| { |
| // stale tablet error |
| auto sp = SyncPoint::get_instance(); |
| sp->set_call_back("CloudMetaMgr::prepare_tablet_job", [](auto&& outcome) { |
| auto* pairs = try_any_cast_ret<Status>(outcome); |
| pairs->second = true; |
| pairs->first = Status::InternalError<false>("mock stale tablet error"); |
| |
| auto* resp = try_any_cast<cloud::StartTabletJobResponse*>(outcome[1]); |
| resp->mutable_status()->set_code(cloud::STALE_TABLET_CACHE); |
| resp->mutable_status()->set_msg("mock stale tablet error"); |
| }); |
| |
| std::shared_ptr<CloudIndexChangeCompaction> index_change_compact = |
| std::make_shared<CloudIndexChangeCompaction>(*_engine, tablet, 0, index_list, |
| columns); |
| index_change_compact->_input_rowsets.push_back(rowset_ptr); |
| bool should_skip_err = false; |
| Status ret = index_change_compact->request_global_lock(should_skip_err); |
| ASSERT_TRUE(!ret.ok()); |
| ASSERT_TRUE(should_skip_err); |
| ASSERT_TRUE(contains_str(ret.to_string(), "mock stale tablet error")); |
| } |
| |
| { |
| // tablet not found |
| auto sp = SyncPoint::get_instance(); |
| sp->set_call_back("CloudMetaMgr::prepare_tablet_job", [](auto&& outcome) { |
| auto* pairs = try_any_cast_ret<Status>(outcome); |
| pairs->second = true; |
| pairs->first = Status::InternalError<false>("mock tablet not found"); |
| |
| auto* resp = try_any_cast<cloud::StartTabletJobResponse*>(outcome[1]); |
| resp->mutable_status()->set_code(cloud::TABLET_NOT_FOUND); |
| resp->mutable_status()->set_msg("mock tablet not found"); |
| }); |
| |
| std::shared_ptr<CloudIndexChangeCompaction> index_change_compact = |
| std::make_shared<CloudIndexChangeCompaction>(*_engine, tablet, 0, index_list, |
| columns); |
| index_change_compact->_input_rowsets.push_back(rowset_ptr); |
| bool should_skip_err = false; |
| Status ret = index_change_compact->request_global_lock(should_skip_err); |
| ASSERT_TRUE(!ret.ok()); |
| ASSERT_TRUE(!should_skip_err); |
| ASSERT_TRUE(contains_str(ret.to_string(), "mock tablet not found")); |
| } |
| |
| { |
| // job tablet busy |
| auto sp = SyncPoint::get_instance(); |
| sp->set_call_back("CloudMetaMgr::prepare_tablet_job", [](auto&& outcome) { |
| auto* pairs = try_any_cast_ret<Status>(outcome); |
| pairs->second = true; |
| pairs->first = Status::InternalError<false>("mock job tablet busy"); |
| |
| auto* resp = try_any_cast<cloud::StartTabletJobResponse*>(outcome[1]); |
| resp->mutable_status()->set_code(cloud::JOB_TABLET_BUSY); |
| resp->mutable_status()->set_msg("mock job tablet busy"); |
| }); |
| std::shared_ptr<CloudIndexChangeCompaction> index_change_compact = |
| std::make_shared<CloudIndexChangeCompaction>(*_engine, tablet, 0, index_list, |
| columns); |
| index_change_compact->_input_rowsets.push_back(rowset_ptr); |
| bool should_skip_err = false; |
| Status ret = index_change_compact->request_global_lock(should_skip_err); |
| ASSERT_TRUE(!ret.ok()); |
| ASSERT_TRUE(!should_skip_err); |
| ASSERT_TRUE(contains_str(ret.to_string(), "index change compaction no suitable versions")); |
| } |
| |
| { |
| // job check alter version |
| auto sp = SyncPoint::get_instance(); |
| sp->set_call_back("CloudMetaMgr::prepare_tablet_job", [](auto&& outcome) { |
| auto* pairs = try_any_cast_ret<Status>(outcome); |
| pairs->second = true; |
| pairs->first = Status::InternalError<false>("mock job check alter version"); |
| |
| auto* resp = try_any_cast<cloud::StartTabletJobResponse*>(outcome[1]); |
| resp->mutable_status()->set_code(cloud::JOB_CHECK_ALTER_VERSION); |
| resp->mutable_status()->set_msg("mock job check alter version"); |
| }); |
| |
| std::shared_ptr<CloudIndexChangeCompaction> index_change_compact = |
| std::make_shared<CloudIndexChangeCompaction>(*_engine, tablet, 0, index_list, |
| columns); |
| index_change_compact->_input_rowsets.push_back(rowset_ptr); |
| bool should_skip_err = false; |
| Status ret = index_change_compact->request_global_lock(should_skip_err); |
| ASSERT_TRUE(!ret.ok()); |
| ASSERT_TRUE(!should_skip_err); |
| ASSERT_TRUE(contains_str(ret.to_string(), "failed in schema change.")); |
| } |
| |
| // modify rowsets |
| { |
| // tablet not found |
| auto sp = SyncPoint::get_instance(); |
| sp->set_call_back("CloudMetaMgr::commit_tablet_job", [](auto&& outcome) { |
| auto* pairs = try_any_cast_ret<Status>(outcome); |
| pairs->second = true; |
| pairs->first = Status::InternalError<false>("mock tablet not found"); |
| |
| auto* resp = try_any_cast<cloud::FinishTabletJobResponse*>(outcome[1]); |
| resp->mutable_status()->set_code(cloud::TABLET_NOT_FOUND); |
| resp->mutable_status()->set_msg("mock tablet not found"); |
| }); |
| |
| std::shared_ptr<CloudIndexChangeCompaction> index_change_compact = |
| std::make_shared<CloudIndexChangeCompaction>(*_engine, tablet, 0, index_list, |
| columns); |
| index_change_compact->_input_rowsets.push_back(rowset_ptr); |
| index_change_compact->_output_rowset = rowset_ptr; |
| index_change_compact->_compact_type = cloud::TabletCompactionJobPB::BASE; |
| Status ret = index_change_compact->modify_rowsets(); |
| ASSERT_TRUE(!ret.ok()); |
| ASSERT_TRUE(contains_str(ret.to_string(), "mock tablet not found")); |
| } |
| |
| { |
| // job check alter version |
| auto sp = SyncPoint::get_instance(); |
| sp->set_call_back("CloudMetaMgr::commit_tablet_job", [](auto&& outcome) { |
| auto* pairs = try_any_cast_ret<Status>(outcome); |
| pairs->second = true; |
| pairs->first = Status::InternalError<false>("mock check alter version"); |
| |
| auto* resp = try_any_cast<cloud::FinishTabletJobResponse*>(outcome[1]); |
| resp->mutable_status()->set_code(cloud::JOB_CHECK_ALTER_VERSION); |
| resp->mutable_status()->set_msg("mock check alter version"); |
| }); |
| |
| std::shared_ptr<CloudIndexChangeCompaction> index_change_compact = |
| std::make_shared<CloudIndexChangeCompaction>(*_engine, tablet, 0, index_list, |
| columns); |
| index_change_compact->_input_rowsets.push_back(rowset_ptr); |
| index_change_compact->_output_rowset = rowset_ptr; |
| index_change_compact->_compact_type = cloud::TabletCompactionJobPB::BASE; |
| Status ret = index_change_compact->modify_rowsets(); |
| ASSERT_TRUE(!ret.ok()); |
| ASSERT_TRUE(contains_str(ret.to_string(), "failed in schema change")); |
| } |
| } |
| |
| TEST_F(CloudIndexChangeCompactionTest, basic_compaction_test) { |
| TabletSchemaPB schema_pb; |
| schema_pb.set_keys_type(KeysType::DUP_KEYS); |
| |
| ColumnPB* column_pb = schema_pb.add_column(); |
| column_pb->set_unique_id(10000); |
| column_pb->set_name("col1"); |
| column_pb->set_type("int"); |
| column_pb->set_is_key(true); |
| column_pb->set_is_nullable(true); |
| auto tablet_schema1 = std::make_shared<TabletSchema>(); |
| tablet_schema1->init_from_pb(schema_pb); |
| |
| auto tablet_schema2 = std::make_shared<TabletSchema>(); |
| tablet_schema2->init_from_pb(schema_pb); |
| |
| auto rowset_meta = std::make_shared<RowsetMeta>(); |
| init_rs_meta(rowset_meta, 0, 1); |
| rowset_meta->set_num_segments(2); |
| rowset_meta->_schema = tablet_schema1; |
| RowsetSharedPtr rowset_ptr = std::make_shared<BetaRowset>(tablet_schema1, rowset_meta, ""); |
| |
| auto output_rowset_meta = std::make_shared<RowsetMeta>(); |
| init_rs_meta(output_rowset_meta, 0, 1); |
| output_rowset_meta->set_num_segments(2); |
| output_rowset_meta->_schema = tablet_schema1; |
| Version v0; |
| v0.first = 10; |
| v0.second = 10; |
| output_rowset_meta->set_version(v0); |
| RowsetSharedPtr output_rowset_ptr = |
| std::make_shared<BetaRowset>(tablet_schema1, output_rowset_meta, ""); |
| |
| CloudTabletSPtr tablet = |
| std::make_shared<CloudTablet>(*_engine, std::make_shared<TabletMeta>()); |
| |
| Version v1; |
| v1.first = 0; |
| v1.second = 2; |
| tablet->_rs_version_map[v1] = rowset_ptr; |
| |
| std::vector<TOlapTableIndex> index_list; |
| std::vector<TColumn> columns; |
| CloudIndexChangeCompaction cloud_index_change_compaction(*_engine, tablet, 123, index_list, |
| columns); |
| |
| cloud_index_change_compaction._input_rowsets.clear(); |
| cloud_index_change_compaction._input_rowsets.push_back(rowset_ptr); |
| Status re_build_schema = cloud_index_change_compaction.rebuild_tablet_schema(); |
| ASSERT_TRUE(cloud_index_change_compaction._final_tablet_schema != nullptr); |
| ASSERT_TRUE(cloud_index_change_compaction._cur_tablet_schema->schema_version() == 123); |
| ASSERT_TRUE(re_build_schema.ok()); |
| |
| // test is_index_change_compaction |
| CloudBaseCompaction cloud_base_compaction(*_engine, tablet); |
| CloudCumulativeCompaction cloud_cumu_compaction(*_engine, tablet); |
| ASSERT_TRUE(cloud_index_change_compaction.is_index_change_compaction()); |
| ASSERT_FALSE(cloud_base_compaction.is_index_change_compaction()); |
| ASSERT_FALSE(cloud_cumu_compaction.is_index_change_compaction()); |
| |
| // test delete predicate |
| cloud_index_change_compaction._input_rowsets.clear(); |
| cloud_index_change_compaction._input_rowsets.push_back(rowset_ptr); |
| cloud_index_change_compaction._output_rowset = output_rowset_ptr; |
| |
| DeletePredicatePB del_pred_pb; |
| InPredicatePB* in_pred = del_pred_pb.add_in_predicates(); |
| in_pred->set_column_name("col1"); |
| in_pred->set_is_not_in(true); |
| in_pred->add_values("123"); |
| in_pred->add_values("456"); |
| rowset_ptr->rowset_meta()->set_delete_predicate(del_pred_pb); |
| |
| cloud_index_change_compaction._allow_delete_in_cumu_compaction = false; |
| reinterpret_cast<Compaction*>(&cloud_index_change_compaction) |
| ->set_delete_predicate_for_output_rowset(); |
| |
| auto& in_predicates = cloud_index_change_compaction._output_rowset->rowset_meta() |
| ->delete_predicate() |
| .in_predicates(); |
| ASSERT_TRUE(in_predicates[0].column_name() == "col1"); |
| ASSERT_TRUE(in_predicates[0].is_not_in() == true); |
| ASSERT_TRUE(in_predicates[0].values().size() == 2); |
| } |
| |
| TEST_F(CloudIndexChangeCompactionTest, test_cloud_tablet) { |
| TabletSchemaPB schema_pb; |
| schema_pb.set_keys_type(KeysType::DUP_KEYS); |
| |
| ColumnPB* column_pb = schema_pb.add_column(); |
| column_pb->set_unique_id(10000); |
| column_pb->set_name("col1"); |
| column_pb->set_type("int"); |
| column_pb->set_is_key(true); |
| column_pb->set_is_nullable(true); |
| auto tablet_schema = std::make_shared<TabletSchema>(); |
| tablet_schema->init_from_pb(schema_pb); |
| tablet_schema->set_schema_version(10); |
| |
| auto rowset_meta = std::make_shared<RowsetMeta>(); |
| init_rs_meta(rowset_meta, 1, 1); |
| rowset_meta->set_num_segments(2); |
| rowset_meta->set_num_rows(0); |
| RowsetSharedPtr rowset_ptr = std::make_shared<BetaRowset>(tablet_schema, rowset_meta, ""); |
| |
| TabletMetaPB input_meta_pb; |
| input_meta_pb.set_tablet_id(1000); |
| input_meta_pb.set_schema_hash(123456); |
| input_meta_pb.set_tablet_state(PB_RUNNING); |
| *input_meta_pb.mutable_tablet_uid() = TabletUid::gen_uid().to_proto(); |
| |
| auto tablet_meta = std::make_shared<TabletMeta>(); |
| tablet_meta->init_from_pb(input_meta_pb); |
| |
| CloudTabletSPtr tablet = std::make_shared<CloudTablet>(*_engine, tablet_meta); |
| |
| Version v1; |
| v1.first = 1; |
| v1.second = 2; |
| tablet->_rs_version_map[v1] = rowset_ptr; |
| |
| { |
| bool is_base_rowset = false; |
| auto ret = tablet->pick_a_rowset_for_index_change(1, is_base_rowset); |
| ASSERT_TRUE(ret.value() == nullptr); |
| ASSERT_TRUE(!is_base_rowset); |
| } |
| |
| { |
| std::vector<TColumn> t_columns; |
| TColumn col1; |
| col1.__set_column_name("col2"); |
| col1.__set_col_unique_id(123); |
| |
| TColumnType column_type; |
| column_type.__set_type(TPrimitiveType::STRING); |
| col1.__set_column_type(column_type); |
| |
| t_columns.emplace_back(std::move(col1)); |
| |
| Status ret1 = tablet->check_rowset_schema_for_build_index(t_columns, 1000); |
| ASSERT_TRUE(!ret1.ok()); |
| ASSERT_TRUE(contains_str(ret1.to_string(), "col is dropped")); |
| } |
| |
| { |
| std::vector<TColumn> t_columns; |
| TColumn col1; |
| col1.__set_column_name("col1"); |
| col1.__set_col_unique_id(123); |
| |
| TColumnType column_type; |
| column_type.__set_type(TPrimitiveType::STRING); |
| col1.__set_column_type(column_type); |
| |
| t_columns.emplace_back(std::move(col1)); |
| |
| Status ret1 = tablet->check_rowset_schema_for_build_index(t_columns, 1000); |
| ASSERT_TRUE(!ret1.ok()); |
| ASSERT_TRUE(contains_str(ret1.to_string(), "col id not match")); |
| } |
| |
| { |
| std::vector<TColumn> t_columns; |
| TColumn col1; |
| col1.__set_column_name("col1"); |
| col1.__set_col_unique_id(10000); |
| |
| TColumnType column_type; |
| column_type.__set_type(TPrimitiveType::STRING); |
| col1.__set_column_type(column_type); |
| |
| t_columns.emplace_back(std::move(col1)); |
| |
| Status ret1 = tablet->check_rowset_schema_for_build_index(t_columns, 1000); |
| ASSERT_TRUE(!ret1.ok()); |
| ASSERT_TRUE(contains_str(ret1.to_string(), "col type not match")); |
| } |
| |
| { |
| std::vector<TColumn> t_columns; |
| TColumn col1; |
| col1.__set_column_name("col1"); |
| col1.__set_col_unique_id(10000); |
| |
| TColumnType column_type; |
| column_type.__set_type(TPrimitiveType::INT); |
| col1.__set_column_type(column_type); |
| |
| t_columns.emplace_back(std::move(col1)); |
| |
| Status ret1 = tablet->check_rowset_schema_for_build_index(t_columns, 1000); |
| ASSERT_TRUE(ret1.ok()); |
| } |
| } |
| |
| } // namespace doris |