blob: adecbba327a04f1d2d55525eb1425415a7f2bdb7 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <brpc/controller.h>
#include <gen_cpp/cloud.pb.h>
#include <gtest/gtest.h>
#include "common/config.h"
#include "meta-service/meta_service.h"
#include "meta-store/keys.h"
#include "meta-store/mem_txn_kv.h"
#include "mock_resource_manager.h"
#include "rate-limiter/rate_limiter.h"
namespace doris::cloud {
// Test: commit_txn always updates last_active_cluster_id
// MS always updates this field, BE controls whether to use it for compaction skipping
TEST(CompactionRWSeparationTest, CommitTxnUpdatesLastActiveCluster) {
auto txn_kv = std::make_shared<MemTxnKv>();
ASSERT_EQ(txn_kv->init(), 0);
// Clear data
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
txn->remove("\x00", "\xfe");
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
auto rs = std::make_shared<MockResourceManager>(txn_kv);
auto rl = std::make_shared<RateLimiter>();
auto snapshot = std::make_shared<SnapshotManager>(txn_kv);
MetaServiceImpl meta_service(txn_kv, rs, rl, snapshot);
int64_t db_id = 1;
int64_t table_id = 100;
int64_t index_id = 100;
int64_t partition_id = 100;
int64_t tablet_id = 10001;
// Create tablet
{
brpc::Controller cntl;
CreateTabletsRequest req;
CreateTabletsResponse res;
req.set_cloud_unique_id("test_cloud_unique_id");
req.set_db_id(db_id);
auto tablet = req.add_tablet_metas();
tablet->set_table_id(table_id);
tablet->set_index_id(index_id);
tablet->set_partition_id(partition_id);
tablet->set_tablet_id(tablet_id);
auto schema = tablet->mutable_schema();
schema->set_schema_version(0);
auto first_rowset = tablet->add_rs_metas();
first_rowset->set_rowset_id(0);
first_rowset->set_rowset_id_v2("rowset_0");
first_rowset->set_start_version(0);
first_rowset->set_end_version(1);
first_rowset->mutable_tablet_schema()->CopyFrom(*schema);
meta_service.create_tablets(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg();
}
// Begin txn
int64_t txn_id = -1;
{
brpc::Controller cntl;
BeginTxnRequest req;
BeginTxnResponse res;
req.set_cloud_unique_id("test_cloud_unique_id");
auto txn_info = req.mutable_txn_info();
txn_info->set_db_id(db_id);
txn_info->set_label("test_label_1");
txn_info->add_table_ids(table_id);
txn_info->set_timeout_ms(36000);
// FE sets load_cluster_id during begin_txn for compaction RW separation
txn_info->set_load_cluster_id(mock_cluster_id);
meta_service.begin_txn(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg();
txn_id = res.txn_id();
}
// Prepare rowset
{
brpc::Controller cntl;
CreateRowsetRequest req;
CreateRowsetResponse res;
req.set_cloud_unique_id("test_cloud_unique_id");
auto rowset = req.mutable_rowset_meta();
rowset->set_rowset_id(0);
rowset->set_rowset_id_v2("rowset_txn_1");
rowset->set_tablet_id(tablet_id);
rowset->set_partition_id(partition_id);
rowset->set_txn_id(txn_id);
rowset->set_num_segments(1);
rowset->set_num_rows(100);
rowset->set_data_disk_size(1024);
rowset->mutable_tablet_schema()->set_schema_version(0);
rowset->set_txn_expiration(::time(nullptr) + 3600);
meta_service.prepare_rowset(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg();
}
// Commit rowset
{
brpc::Controller cntl;
CreateRowsetRequest req;
CreateRowsetResponse res;
req.set_cloud_unique_id("test_cloud_unique_id");
auto rowset = req.mutable_rowset_meta();
rowset->set_rowset_id(0);
rowset->set_rowset_id_v2("rowset_txn_1");
rowset->set_tablet_id(tablet_id);
rowset->set_partition_id(partition_id);
rowset->set_txn_id(txn_id);
rowset->set_num_segments(1);
rowset->set_num_rows(100);
rowset->set_data_disk_size(1024);
rowset->mutable_tablet_schema()->set_schema_version(0);
rowset->set_txn_expiration(::time(nullptr) + 3600);
meta_service.commit_rowset(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg();
}
// Commit txn
{
brpc::Controller cntl;
CommitTxnRequest req;
CommitTxnResponse res;
req.set_cloud_unique_id("test_cloud_unique_id");
req.set_db_id(db_id);
req.set_txn_id(txn_id);
meta_service.commit_txn(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg();
}
// Get rowset and check last_active_cluster_id
{
brpc::Controller cntl;
GetRowsetRequest req;
GetRowsetResponse res;
req.set_cloud_unique_id("test_cloud_unique_id");
req.mutable_idx()->set_tablet_id(tablet_id);
req.set_start_version(0);
req.set_end_version(-1);
req.set_base_compaction_cnt(0);
req.set_cumulative_compaction_cnt(0);
req.set_cumulative_point(2);
meta_service.get_rowset(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg();
// Check that last_active_cluster_id is always set (MS always updates it)
EXPECT_TRUE(res.stats().has_last_active_cluster_id());
EXPECT_EQ(res.stats().last_active_cluster_id(), mock_cluster_id);
}
}
} // namespace doris::cloud