blob: c886c43cf1bc4c2ff54e8215cd15056494c85eb3 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <brpc/controller.h>
#include <gen_cpp/cloud.pb.h>
#include <gen_cpp/olap_file.pb.h>
#include <google/protobuf/arena.h>
#include <gtest/gtest.h>
#include <random>
#include "common/config.h"
#include "common/defer.h"
#include "cpp/sync_point.h"
#include "meta-service/meta_service.h"
#include "meta-service/meta_service_schema.h"
#include "meta-store/blob_message.h"
#include "meta-store/document_message.h"
#include "meta-store/keys.h"
#include "meta-store/txn_kv.h"
#include "meta-store/txn_kv_error.h"
static std::string instance_id = "schema_kv_test";
namespace doris::cloud {
extern std::unique_ptr<MetaServiceProxy> get_meta_service();
static std::string next_rowset_id() {
static int cnt = 0;
return std::to_string(++cnt);
}
static void fill_schema(doris::TabletSchemaCloudPB* schema, int32_t schema_version) {
schema->set_schema_version(schema_version);
for (int i = 0; i < 10; ++i) {
auto column = schema->add_column();
column->set_unique_id(20000 + i);
column->set_type("INT");
}
}
static void add_tablet(CreateTabletsRequest& req, int64_t table_id, int64_t index_id,
int64_t partition_id, int64_t tablet_id, const std::string& rowset_id,
int32_t schema_version) {
auto tablet = req.add_tablet_metas();
tablet->set_table_id(table_id);
tablet->set_index_id(index_id);
tablet->set_partition_id(partition_id);
tablet->set_tablet_id(tablet_id);
auto schema = tablet->mutable_schema();
fill_schema(schema, schema_version);
auto first_rowset = tablet->add_rs_metas();
first_rowset->set_rowset_id(0); // required
first_rowset->set_rowset_id_v2(rowset_id);
first_rowset->set_start_version(0);
first_rowset->set_end_version(1);
first_rowset->mutable_tablet_schema()->CopyFrom(*schema);
}
static void create_tablet(MetaServiceProxy* meta_service, int64_t table_id, int64_t index_id,
int64_t partition_id, int64_t tablet_id, const std::string& rowset_id,
int32_t schema_version) {
brpc::Controller cntl;
CreateTabletsRequest req;
CreateTabletsResponse res;
add_tablet(req, table_id, index_id, partition_id, tablet_id, rowset_id, schema_version);
meta_service->create_tablets(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << tablet_id;
}
static void get_rowset(MetaServiceProxy* meta_service, int64_t table_id, int64_t index_id,
int64_t partition_id, int64_t tablet_id, GetRowsetResponse& res) {
brpc::Controller cntl;
GetRowsetRequest req;
auto tablet_idx = req.mutable_idx();
tablet_idx->set_table_id(table_id);
tablet_idx->set_index_id(index_id);
tablet_idx->set_partition_id(partition_id);
tablet_idx->set_tablet_id(tablet_id);
req.set_start_version(0);
req.set_end_version(-1);
req.set_cumulative_compaction_cnt(0);
req.set_base_compaction_cnt(0);
req.set_cumulative_point(2);
meta_service->get_rowset(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << tablet_id;
}
static void check_get_tablet(MetaServiceProxy* meta_service, int64_t tablet_id,
int32_t schema_version) {
brpc::Controller cntl;
GetTabletRequest req;
GetTabletResponse res;
req.set_tablet_id(tablet_id);
meta_service->get_tablet(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << tablet_id;
ASSERT_TRUE(res.has_tablet_meta()) << tablet_id;
EXPECT_TRUE(res.tablet_meta().has_schema()) << tablet_id;
EXPECT_EQ(res.tablet_meta().schema_version(), schema_version) << tablet_id;
};
TEST(DetachSchemaKVTest, TabletTest) {
auto meta_service = get_meta_service();
// meta_service->resource_mgr().reset(); // Do not use resource manager
auto sp = SyncPoint::get_instance();
DORIS_CLOUD_DEFER {
SyncPoint::get_instance()->clear_all_call_backs();
};
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
ret->second = true;
});
sp->enable_processing();
// new MS write with write_schema_kv=false, old MS read
{
constexpr auto table_id = 10001, index_id = 10002, partition_id = 10003, tablet_id = 10004;
config::write_schema_kv = false;
ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), table_id, index_id, partition_id,
tablet_id, next_rowset_id(), 1));
// check saved values in txn_kv
std::unique_ptr<Transaction> txn;
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string tablet_key, tablet_val;
meta_tablet_key({instance_id, table_id, index_id, partition_id, tablet_id}, &tablet_key);
ASSERT_EQ(txn->get(tablet_key, &tablet_val), TxnErrorCode::TXN_OK);
doris::TabletMetaCloudPB saved_tablet;
ASSERT_TRUE(saved_tablet.ParseFromString(tablet_val));
EXPECT_TRUE(saved_tablet.has_schema());
std::string rowset_key, rowset_val;
meta_rowset_key({instance_id, tablet_id, 1}, &rowset_key);
ASSERT_EQ(txn->get(rowset_key, &rowset_val), TxnErrorCode::TXN_OK);
doris::RowsetMetaCloudPB saved_rowset;
ASSERT_TRUE(saved_rowset.ParseFromString(rowset_val));
EXPECT_TRUE(saved_rowset.has_tablet_schema());
}
// old MS write, new MS read
{
constexpr auto table_id = 10011, index_id = 10012, partition_id = 10013, tablet_id = 10014;
std::unique_ptr<Transaction> txn;
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
doris::TabletMetaCloudPB saved_tablet;
saved_tablet.set_table_id(table_id);
saved_tablet.set_index_id(index_id);
saved_tablet.set_partition_id(partition_id);
saved_tablet.set_tablet_id(tablet_id);
saved_tablet.mutable_schema()->set_schema_version(1);
auto column = saved_tablet.mutable_schema()->add_column();
column->set_unique_id(30001);
column->set_type("INT");
std::string tablet_key, tablet_val;
meta_tablet_key({instance_id, table_id, index_id, partition_id, tablet_id}, &tablet_key);
ASSERT_TRUE(saved_tablet.SerializeToString(&tablet_val));
txn->put(tablet_key, tablet_val);
TabletIndexPB saved_tablet_idx;
saved_tablet_idx.set_table_id(table_id);
saved_tablet_idx.set_index_id(index_id);
saved_tablet_idx.set_partition_id(partition_id);
saved_tablet_idx.set_tablet_id(tablet_id);
std::string tablet_idx_key, tablet_idx_val;
meta_tablet_idx_key({instance_id, tablet_id}, &tablet_idx_key);
ASSERT_TRUE(saved_tablet_idx.SerializeToString(&tablet_idx_val));
txn->put(tablet_idx_key, tablet_idx_val);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
// check get tablet response
check_get_tablet(meta_service.get(), tablet_id, 1);
}
auto check_new_saved_tablet_val = [](Transaction* txn, int64_t table_id, int64_t index_id,
int64_t partition_id, int64_t tablet_id,
int32_t schema_version) {
std::string tablet_key, tablet_val;
meta_tablet_key({instance_id, table_id, index_id, partition_id, tablet_id}, &tablet_key);
ASSERT_EQ(txn->get(tablet_key, &tablet_val), TxnErrorCode::TXN_OK);
doris::TabletMetaCloudPB saved_tablet;
ASSERT_TRUE(saved_tablet.ParseFromString(tablet_val));
EXPECT_FALSE(saved_tablet.has_schema()) << tablet_id;
EXPECT_EQ(saved_tablet.schema_version(), schema_version) << tablet_id;
};
// new MS write with write_schema_kv=true, new MS read
{
constexpr auto table_id = 10021, index_id = 10022, partition_id = 10023, tablet_id = 10024;
config::write_schema_kv = true;
ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), table_id, index_id, partition_id,
tablet_id, next_rowset_id(), 1));
// check saved values in txn_kv
std::unique_ptr<Transaction> txn;
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
check_new_saved_tablet_val(txn.get(), table_id, index_id, partition_id, tablet_id, 1);
std::string rowset_key, rowset_val;
meta_rowset_key({instance_id, tablet_id, 1}, &rowset_key);
ASSERT_EQ(txn->get(rowset_key, &rowset_val), TxnErrorCode::TXN_OK);
doris::RowsetMetaCloudPB saved_rowset;
ASSERT_TRUE(saved_rowset.ParseFromString(rowset_val));
EXPECT_FALSE(saved_rowset.has_tablet_schema());
EXPECT_EQ(saved_rowset.index_id(), index_id);
EXPECT_EQ(saved_rowset.schema_version(), 1);
// check get tablet response
check_get_tablet(meta_service.get(), tablet_id, 1);
// check get rowset response
GetRowsetResponse get_rowset_res;
get_rowset(meta_service.get(), table_id, index_id, partition_id, tablet_id, get_rowset_res);
ASSERT_EQ(get_rowset_res.rowset_meta_size(), 1);
EXPECT_TRUE(get_rowset_res.rowset_meta(0).has_tablet_schema());
EXPECT_EQ(get_rowset_res.rowset_meta(0).index_id(), index_id);
EXPECT_EQ(get_rowset_res.rowset_meta(0).schema_version(), 1);
ASSERT_TRUE(get_rowset_res.has_stats());
EXPECT_EQ(get_rowset_res.stats().num_rows(), 0);
EXPECT_EQ(get_rowset_res.stats().num_rowsets(), 1);
EXPECT_EQ(get_rowset_res.stats().num_segments(), 0);
EXPECT_EQ(get_rowset_res.stats().data_size(), 0);
EXPECT_EQ(get_rowset_res.stats().index_size(), 0);
EXPECT_EQ(get_rowset_res.stats().segment_size(), 0);
}
// new MS batch create tablets with write_schema_kv=true
{
config::write_schema_kv = true;
brpc::Controller cntl;
CreateTabletsRequest req;
CreateTabletsResponse res;
add_tablet(req, 10031, 10032, 10033, 100031, next_rowset_id(), 1);
add_tablet(req, 10031, 10032, 10033, 100032, next_rowset_id(), 2);
add_tablet(req, 10031, 10032, 10033, 100033, next_rowset_id(), 2);
add_tablet(req, 10031, 10032, 10033, 100034, next_rowset_id(), 3);
add_tablet(req, 10031, 10032, 10033, 100035, next_rowset_id(), 3);
add_tablet(req, 10031, 10032, 10033, 100036, next_rowset_id(), 3);
add_tablet(req, 10031, 10034, 10033, 100037, next_rowset_id(), 1);
add_tablet(req, 10031, 10034, 10033, 100038, next_rowset_id(), 2);
add_tablet(req, 10031, 10034, 10033, 100039, next_rowset_id(), 2);
meta_service->create_tablets(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
// check saved values in txn_kv
std::unique_ptr<Transaction> txn;
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
check_new_saved_tablet_val(txn.get(), 10031, 10032, 10033, 100031, 1);
check_new_saved_tablet_val(txn.get(), 10031, 10032, 10033, 100032, 2);
check_new_saved_tablet_val(txn.get(), 10031, 10032, 10033, 100033, 2);
check_new_saved_tablet_val(txn.get(), 10031, 10032, 10033, 100034, 3);
check_new_saved_tablet_val(txn.get(), 10031, 10032, 10033, 100035, 3);
check_new_saved_tablet_val(txn.get(), 10031, 10032, 10033, 100036, 3);
check_new_saved_tablet_val(txn.get(), 10031, 10034, 10033, 100037, 1);
check_new_saved_tablet_val(txn.get(), 10031, 10034, 10033, 100038, 2);
check_new_saved_tablet_val(txn.get(), 10031, 10034, 10033, 100039, 2);
// check get tablet response
check_get_tablet(meta_service.get(), 100031, 1);
check_get_tablet(meta_service.get(), 100032, 2);
check_get_tablet(meta_service.get(), 100033, 2);
check_get_tablet(meta_service.get(), 100034, 3);
check_get_tablet(meta_service.get(), 100035, 3);
check_get_tablet(meta_service.get(), 100036, 3);
check_get_tablet(meta_service.get(), 100037, 1);
check_get_tablet(meta_service.get(), 100038, 2);
check_get_tablet(meta_service.get(), 100039, 2);
}
}
TEST(DetachSchemaKVTest, PutSchemaKvTest) {
config::meta_schema_value_version = 1;
auto meta_service = get_meta_service();
int64_t index_id = 14221;
int64_t schema_version = 0;
std::string key = meta_schema_key({instance_id, index_id, schema_version});
std::string versioned_key = versioned::meta_schema_key({instance_id, index_id, schema_version});
doris::TabletSchemaCloudPB schema;
fill_schema(&schema, schema_version);
std::unique_ptr<Transaction> txn;
MetaServiceCode code = MetaServiceCode::OK;
std::string msg;
for (int i = 0; i < 2; ++i) {
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
put_schema_kv(code, msg, txn.get(), key, schema);
ASSERT_EQ(code, MetaServiceCode::OK);
put_versioned_schema_kv(code, msg, txn.get(), versioned_key, schema);
ASSERT_EQ(code, MetaServiceCode::OK);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
// verify the tablet schema is written
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
doris::TabletSchemaCloudPB saved_schema;
ValueBuf buf;
ASSERT_EQ(cloud::blob_get(txn.get(), key, &buf), TxnErrorCode::TXN_OK);
// verify the versioned tablet schema is written
ASSERT_EQ(document_get(txn.get(), versioned_key, &saved_schema), TxnErrorCode::TXN_OK);
EXPECT_EQ(saved_schema.schema_version(), schema_version);
}
{
// put new schema version
schema_version = 1;
schema.set_schema_version(schema_version);
key = meta_schema_key({instance_id, index_id, schema_version});
versioned_key = versioned::meta_schema_key({instance_id, index_id, schema_version});
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
put_schema_kv(code, msg, txn.get(), key, schema);
ASSERT_EQ(code, MetaServiceCode::OK);
put_versioned_schema_kv(code, msg, txn.get(), versioned_key, schema);
ASSERT_EQ(code, MetaServiceCode::OK);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
// verify the tablet schema is written
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
doris::TabletSchemaCloudPB saved_schema;
ValueBuf buf;
ASSERT_EQ(cloud::blob_get(txn.get(), key, &buf), TxnErrorCode::TXN_OK);
// verify the versioned tablet schema is written
ASSERT_EQ(document_get(txn.get(), versioned_key, &saved_schema), TxnErrorCode::TXN_OK);
EXPECT_EQ(saved_schema.schema_version(), schema_version);
}
}
static void begin_txn(MetaServiceProxy* meta_service, int64_t db_id, const std::string& label,
int64_t table_id, int64_t& txn_id) {
brpc::Controller cntl;
BeginTxnRequest req;
BeginTxnResponse res;
auto txn_info = req.mutable_txn_info();
txn_info->set_db_id(db_id);
txn_info->set_label(label);
txn_info->add_table_ids(table_id);
txn_info->set_timeout_ms(36000);
meta_service->begin_txn(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;
ASSERT_TRUE(res.has_txn_id()) << label;
txn_id = res.txn_id();
}
static void commit_txn(MetaServiceProxy* meta_service, int64_t db_id, int64_t txn_id,
const std::string& label) {
brpc::Controller cntl;
CommitTxnRequest req;
CommitTxnResponse res;
req.set_db_id(db_id);
req.set_txn_id(txn_id);
meta_service->commit_txn(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;
}
static doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, int64_t tablet_id,
const std::string& rowset_id, int32_t schema_version,
int64_t version = -1,
const TabletSchemaCloudPB* schema = nullptr) {
doris::RowsetMetaCloudPB rowset;
rowset.set_rowset_id(0); // required
rowset.set_rowset_id_v2(rowset_id);
rowset.set_tablet_id(tablet_id);
rowset.set_txn_id(txn_id);
rowset.set_num_rows(100);
rowset.set_num_segments(1);
rowset.set_data_disk_size(10000);
rowset.set_index_disk_size(1000);
rowset.set_total_disk_size(11000);
if (version > 0) {
rowset.set_start_version(version);
rowset.set_end_version(version);
}
rowset.mutable_tablet_schema()->set_schema_version(schema_version);
rowset.set_txn_expiration(::time(nullptr)); // Required by DCHECK
if (schema != nullptr) {
rowset.mutable_tablet_schema()->CopyFrom(*schema);
rowset.mutable_tablet_schema()->set_schema_version(schema_version);
}
return rowset;
}
static void prepare_rowset(MetaServiceProxy* meta_service, const doris::RowsetMetaCloudPB& rowset,
CreateRowsetResponse& res) {
brpc::Controller cntl;
auto arena = res.GetArena();
auto req = google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena);
req->mutable_rowset_meta()->CopyFrom(rowset);
meta_service->prepare_rowset(&cntl, req, &res, nullptr);
if (!arena) delete req;
}
static void commit_rowset(MetaServiceProxy* meta_service, const doris::RowsetMetaCloudPB& rowset,
CreateRowsetResponse& res) {
brpc::Controller cntl;
auto arena = res.GetArena();
auto req = google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena);
req->mutable_rowset_meta()->CopyFrom(rowset);
meta_service->commit_rowset(&cntl, req, &res, nullptr);
if (!arena) delete req;
}
static void insert_rowset(MetaServiceProxy* meta_service, int64_t db_id, const std::string& label,
int64_t table_id, int64_t tablet_id, int32_t schema_version,
const TabletSchemaCloudPB* schema = nullptr) {
int64_t txn_id = 0;
ASSERT_NO_FATAL_FAILURE(begin_txn(meta_service, db_id, label, table_id, txn_id));
CreateRowsetResponse res;
auto rowset = create_rowset(txn_id, tablet_id, next_rowset_id(), schema_version, -1, schema);
rowset.set_has_variant_type_in_schema(schema != nullptr);
prepare_rowset(meta_service, rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;
res.Clear();
ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service, rowset, res));
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label << ", msg=" << res.status().msg();
commit_txn(meta_service, db_id, txn_id, label);
}
static TabletSchemaCloudPB getVariantSchema() {
TabletSchemaCloudPB schema;
schema.set_schema_version(10086);
// columns
ColumnPB var;
var.set_type("VARIANT");
var.set_unique_id(10);
ColumnPB var_sub1;
var_sub1.set_type("INT");
var_sub1.set_unique_id(-1);
schema.add_column()->CopyFrom(var_sub1);
ColumnPB var_sub2;
var_sub2.set_type("DOUBLE");
var_sub2.set_unique_id(-1);
schema.add_column()->CopyFrom(var_sub2);
ColumnPB var_sparse_sub1;
var_sparse_sub1.set_type("DOUBLE");
var_sparse_sub1.set_unique_id(-1);
var.add_sparse_columns()->CopyFrom(var_sparse_sub1);
schema.add_column()->CopyFrom(var);
// indexes
doris::TabletIndexPB index1;
index1.set_index_id(111);
index1.set_index_suffix_name("aaabbbccc");
schema.add_index()->CopyFrom(index1);
doris::TabletIndexPB index2;
index2.set_index_id(222);
schema.add_index()->CopyFrom(index2);
return schema;
}
TEST(DetachSchemaKVTest, RowsetTest) {
auto meta_service = get_meta_service();
// meta_service->resource_mgr().reset(); // Do not use resource manager
auto sp = SyncPoint::get_instance();
DORIS_CLOUD_DEFER {
SyncPoint::get_instance()->clear_all_call_backs();
};
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
ret->second = true;
});
sp->enable_processing();
constexpr int64_t db_id = 10000;
// new MS write with write_schema_kv=false, old MS read
{
constexpr auto table_id = 10001, index_id = 10002, partition_id = 10003, tablet_id = 10004;
config::write_schema_kv = false;
ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), table_id, index_id, partition_id,
tablet_id, next_rowset_id(), 1));
ASSERT_NO_FATAL_FAILURE(
insert_rowset(meta_service.get(), db_id, "101", table_id, tablet_id, 2)); // [2-2]
// check saved values in txn_kv
std::unique_ptr<Transaction> txn;
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string rowset_key, rowset_val;
meta_rowset_key({instance_id, tablet_id, 2}, &rowset_key); // [2-2]
ASSERT_EQ(txn->get(rowset_key, &rowset_val), TxnErrorCode::TXN_OK);
doris::RowsetMetaCloudPB saved_rowset;
ASSERT_TRUE(saved_rowset.ParseFromString(rowset_val));
ASSERT_TRUE(saved_rowset.has_tablet_schema());
EXPECT_EQ(saved_rowset.tablet_schema().schema_version(), 2);
}
// old MS write, new MS read
{
constexpr auto table_id = 10011, index_id = 10012, partition_id = 10013, tablet_id = 10014;
config::write_schema_kv = false;
ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), table_id, index_id, partition_id,
tablet_id, next_rowset_id(), 1));
std::unique_ptr<Transaction> txn;
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
auto saved_rowset = create_rowset(10015, tablet_id, next_rowset_id(), 2, 2);
std::string rowset_key, rowset_val;
meta_rowset_key({instance_id, tablet_id, 2}, &rowset_key); // version=[2-2]
ASSERT_TRUE(saved_rowset.SerializeToString(&rowset_val));
txn->put(rowset_key, rowset_val);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
// check get rowset response
GetRowsetResponse get_rowset_res;
get_rowset(meta_service.get(), table_id, index_id, partition_id, tablet_id, get_rowset_res);
ASSERT_EQ(get_rowset_res.rowset_meta_size(), 2);
ASSERT_EQ(get_rowset_res.rowset_meta(0).end_version(), 1); // [0-1]
ASSERT_TRUE(get_rowset_res.rowset_meta(0).has_tablet_schema());
EXPECT_EQ(get_rowset_res.rowset_meta(0).tablet_schema().schema_version(), 1);
EXPECT_EQ(get_rowset_res.rowset_meta(0).index_id(), index_id);
EXPECT_EQ(get_rowset_res.rowset_meta(0).schema_version(), 1);
ASSERT_EQ(get_rowset_res.rowset_meta(1).end_version(), 2); // [2-2]
ASSERT_TRUE(get_rowset_res.rowset_meta(1).has_tablet_schema());
EXPECT_EQ(get_rowset_res.rowset_meta(1).tablet_schema().schema_version(), 2);
EXPECT_EQ(get_rowset_res.rowset_meta(1).index_id(), index_id);
EXPECT_EQ(get_rowset_res.rowset_meta(1).schema_version(), 2);
}
// new MS write with write_schema_kv=true, new MS read
{
constexpr auto table_id = 10021, index_id = 10022, partition_id = 10023, tablet_id = 10024;
config::write_schema_kv = true;
ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), table_id, index_id, partition_id,
tablet_id, next_rowset_id(), 1));
ASSERT_NO_FATAL_FAILURE(
insert_rowset(meta_service.get(), db_id, "201", table_id, tablet_id, 2)); // [2-2]
// check saved values in txn_kv
std::unique_ptr<Transaction> txn;
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string rowset_key, rowset_val;
meta_rowset_key({instance_id, 10024, 2}, &rowset_key); // [2-2]
ASSERT_EQ(txn->get(rowset_key, &rowset_val), TxnErrorCode::TXN_OK);
doris::RowsetMetaCloudPB saved_rowset;
ASSERT_TRUE(saved_rowset.ParseFromString(rowset_val));
EXPECT_FALSE(saved_rowset.has_tablet_schema());
EXPECT_EQ(saved_rowset.index_id(), index_id);
EXPECT_EQ(saved_rowset.schema_version(), 2);
// check get rowset response
GetRowsetResponse get_rowset_res;
get_rowset(meta_service.get(), table_id, index_id, partition_id, tablet_id, get_rowset_res);
ASSERT_EQ(get_rowset_res.rowset_meta_size(), 2);
ASSERT_EQ(get_rowset_res.rowset_meta(0).end_version(), 1); // [0-1]
ASSERT_TRUE(get_rowset_res.rowset_meta(0).has_tablet_schema());
EXPECT_EQ(get_rowset_res.rowset_meta(0).tablet_schema().schema_version(), 1);
EXPECT_EQ(get_rowset_res.rowset_meta(0).index_id(), index_id);
EXPECT_EQ(get_rowset_res.rowset_meta(0).schema_version(), 1);
ASSERT_EQ(get_rowset_res.rowset_meta(1).end_version(), 2); // [2-2]
ASSERT_TRUE(get_rowset_res.rowset_meta(1).has_tablet_schema());
EXPECT_EQ(get_rowset_res.rowset_meta(1).tablet_schema().schema_version(), 2);
EXPECT_EQ(get_rowset_res.rowset_meta(1).index_id(), index_id);
EXPECT_EQ(get_rowset_res.rowset_meta(1).schema_version(), 2);
ASSERT_TRUE(get_rowset_res.has_stats());
EXPECT_EQ(get_rowset_res.stats().num_rows(), 100);
EXPECT_EQ(get_rowset_res.stats().num_rowsets(), 2);
EXPECT_EQ(get_rowset_res.stats().num_segments(), 1);
EXPECT_EQ(get_rowset_res.stats().data_size(), 11000);
EXPECT_EQ(get_rowset_res.stats().index_size(), 1000);
EXPECT_EQ(get_rowset_res.stats().segment_size(), 10000);
}
// new MS read rowsets committed by both old and new MS
auto insert_and_get_rowset = [&meta_service](int64_t table_id, int64_t index_id,
int64_t partition_id, int64_t tablet_id,
int label_base,
google::protobuf::Arena* arena = nullptr,
const TabletSchemaCloudPB* schema = nullptr) {
config::write_schema_kv = false;
std::mt19937 rng(std::chrono::system_clock::now().time_since_epoch().count());
std::uniform_int_distribution<int> dist1(1, 4);
std::uniform_int_distribution<int> dist2(2, 7);
std::vector<int> schema_versions {1};
ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), table_id, index_id, partition_id,
tablet_id, next_rowset_id(), schema_versions[0]));
for (int i = 0; i < 10; ++i) {
schema_versions.push_back(dist1(rng));
ASSERT_NO_FATAL_FAILURE(insert_rowset(meta_service.get(), db_id,
std::to_string(++label_base), table_id, tablet_id,
schema_versions.back(), schema));
}
config::write_schema_kv = true;
for (int i = 0; i < 15; ++i) {
schema_versions.push_back(dist2(rng));
ASSERT_NO_FATAL_FAILURE(insert_rowset(meta_service.get(), db_id,
std::to_string(++label_base), table_id, tablet_id,
schema_versions.back(), schema));
}
// check get rowset response
auto get_rowset_res = google::protobuf::Arena::CreateMessage<GetRowsetResponse>(arena);
DORIS_CLOUD_DEFER {
if (!arena) delete get_rowset_res;
};
get_rowset(meta_service.get(), table_id, index_id, partition_id, tablet_id,
*get_rowset_res);
ASSERT_EQ(get_rowset_res->rowset_meta_size(), schema_versions.size());
for (int i = 0; i < schema_versions.size(); ++i) {
auto& rowset = get_rowset_res->rowset_meta(i);
ASSERT_EQ(rowset.end_version(), i + 1);
ASSERT_TRUE(rowset.has_tablet_schema());
EXPECT_EQ(rowset.tablet_schema().schema_version(), schema_versions[i]);
EXPECT_EQ(rowset.index_id(), index_id);
EXPECT_EQ(rowset.schema_version(), schema_versions[i]);
}
ASSERT_TRUE(get_rowset_res->has_stats());
EXPECT_EQ(get_rowset_res->stats().num_rows(), 2500);
EXPECT_EQ(get_rowset_res->stats().num_rowsets(), 26);
EXPECT_EQ(get_rowset_res->stats().num_segments(), 25);
EXPECT_EQ(get_rowset_res->stats().data_size(), 275000);
EXPECT_EQ(get_rowset_res->stats().index_size(), 25000);
EXPECT_EQ(get_rowset_res->stats().segment_size(), 250000);
if (schema != nullptr) {
auto schema_version = get_rowset_res->rowset_meta(10).schema_version();
get_rowset_res->mutable_rowset_meta(10)->mutable_tablet_schema()->set_schema_version(
10086);
std::cout << get_rowset_res->rowset_meta(10).tablet_schema().ShortDebugString()
<< std::endl;
std::cout << schema->ShortDebugString() << std::endl;
EXPECT_EQ(get_rowset_res->rowset_meta(10).tablet_schema().column(2).type(),
schema->column(2).type());
EXPECT_EQ(get_rowset_res->rowset_meta(10).tablet_schema().index(0).index_suffix_name(),
schema->index(0).index_suffix_name());
EXPECT_EQ(get_rowset_res->rowset_meta(10).tablet_schema().index(1).index_id(),
schema->index(1).index_id());
get_rowset_res->mutable_rowset_meta(10)->mutable_tablet_schema()->set_schema_version(
schema_version);
}
};
insert_and_get_rowset(10031, 10032, 10033, 10034, 300);
// use arena
google::protobuf::Arena arena;
insert_and_get_rowset(10041, 10042, 10043, 10044, 400, &arena);
TabletSchemaCloudPB schema = getVariantSchema();
insert_and_get_rowset(10051, 10052, 10053, 10054, 500, &arena, &schema);
}
TEST(DetachSchemaKVTest, InsertExistedRowsetTest) {
auto meta_service = get_meta_service();
// meta_service->resource_mgr().reset(); // Do not use resource manager
auto sp = SyncPoint::get_instance();
DORIS_CLOUD_DEFER {
SyncPoint::get_instance()->clear_all_call_backs();
};
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
ret->second = true;
});
sp->enable_processing();
// old MS commit rowset, new MS commit rowset again
{
constexpr auto table_id = 10001, index_id = 10002, partition_id = 10003, tablet_id = 10004;
config::write_schema_kv = false;
ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), table_id, index_id, partition_id,
tablet_id, next_rowset_id(), 1));
std::unique_ptr<Transaction> txn;
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
auto committed_rowset = create_rowset(10005, tablet_id, next_rowset_id(), 2, 2);
std::string tmp_rowset_key, tmp_rowset_val;
// 0:instance_id 1:txn_id 2:tablet_id
meta_rowset_tmp_key({instance_id, 10005, tablet_id}, &tmp_rowset_key);
ASSERT_TRUE(committed_rowset.SerializeToString(&tmp_rowset_val));
txn->put(tmp_rowset_key, tmp_rowset_val);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
CreateRowsetResponse res;
auto new_rowset = create_rowset(10005, tablet_id, next_rowset_id(), 2, 2);
prepare_rowset(meta_service.get(), new_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::ALREADY_EXISTED);
ASSERT_TRUE(res.has_existed_rowset_meta());
EXPECT_EQ(res.existed_rowset_meta().rowset_id_v2(), committed_rowset.rowset_id_v2());
EXPECT_EQ(res.existed_rowset_meta().index_id(), index_id);
EXPECT_EQ(res.existed_rowset_meta().schema_version(), 2);
ASSERT_TRUE(res.existed_rowset_meta().has_tablet_schema());
EXPECT_EQ(res.existed_rowset_meta().tablet_schema().schema_version(), 2);
res.Clear();
commit_rowset(meta_service.get(), new_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::ALREADY_EXISTED);
ASSERT_TRUE(res.has_existed_rowset_meta());
EXPECT_EQ(res.existed_rowset_meta().rowset_id_v2(), committed_rowset.rowset_id_v2());
EXPECT_EQ(res.existed_rowset_meta().index_id(), index_id);
EXPECT_EQ(res.existed_rowset_meta().schema_version(), 2);
ASSERT_TRUE(res.existed_rowset_meta().has_tablet_schema());
EXPECT_EQ(res.existed_rowset_meta().tablet_schema().schema_version(), 2);
}
// new MS commit rowset, new MS commit rowset again
auto insert_existed_rowset = [&meta_service](int64_t table_id, int64_t index_id,
int64_t partition_id, int64_t tablet_id,
int64_t txn_id,
google::protobuf::Arena* arena = nullptr) {
config::write_schema_kv = true;
ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), table_id, index_id, partition_id,
tablet_id, next_rowset_id(), 1));
auto committed_rowset = create_rowset(txn_id, tablet_id, next_rowset_id(), 2, 2);
auto res = google::protobuf::Arena::CreateMessage<CreateRowsetResponse>(arena);
DORIS_CLOUD_DEFER {
if (!arena) delete res;
};
prepare_rowset(meta_service.get(), committed_rowset, *res);
ASSERT_EQ(res->status().code(), MetaServiceCode::OK);
res->Clear();
commit_rowset(meta_service.get(), committed_rowset, *res);
ASSERT_EQ(res->status().code(), MetaServiceCode::OK);
res->Clear();
auto new_rowset = create_rowset(txn_id, tablet_id, next_rowset_id(), 2, 2);
prepare_rowset(meta_service.get(), new_rowset, *res);
ASSERT_EQ(res->status().code(), MetaServiceCode::ALREADY_EXISTED);
ASSERT_TRUE(res->has_existed_rowset_meta());
EXPECT_EQ(res->existed_rowset_meta().rowset_id_v2(), committed_rowset.rowset_id_v2());
EXPECT_EQ(res->existed_rowset_meta().index_id(), index_id);
EXPECT_EQ(res->existed_rowset_meta().schema_version(), 2);
ASSERT_TRUE(res->existed_rowset_meta().has_tablet_schema());
EXPECT_EQ(res->existed_rowset_meta().tablet_schema().schema_version(), 2);
res->Clear();
commit_rowset(meta_service.get(), new_rowset, *res);
ASSERT_EQ(res->status().code(), MetaServiceCode::ALREADY_EXISTED);
ASSERT_TRUE(res->has_existed_rowset_meta());
EXPECT_EQ(res->existed_rowset_meta().rowset_id_v2(), committed_rowset.rowset_id_v2());
EXPECT_EQ(res->existed_rowset_meta().index_id(), index_id);
EXPECT_EQ(res->existed_rowset_meta().schema_version(), 2);
ASSERT_TRUE(res->existed_rowset_meta().has_tablet_schema());
EXPECT_EQ(res->existed_rowset_meta().tablet_schema().schema_version(), 2);
};
insert_existed_rowset(10011, 10012, 10013, 10014, 10015);
google::protobuf::Arena arena;
insert_existed_rowset(10021, 10022, 10023, 10024, 10025, &arena);
}
TEST(SchemaKVTest, InsertExistedRowsetTest) {
auto meta_service = get_meta_service();
// meta_service->resource_mgr().reset(); // Do not use resource manager
auto sp = SyncPoint::get_instance();
DORIS_CLOUD_DEFER {
SyncPoint::get_instance()->clear_all_call_backs();
};
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
ret->second = true;
});
sp->enable_processing();
config::write_schema_kv = true;
config::meta_schema_value_version = 0;
ASSERT_NO_FATAL_FAILURE(
create_tablet(meta_service.get(), 10001, 10002, 10003, 10004, next_rowset_id(), 1));
check_get_tablet(meta_service.get(), 10004, 1);
config::meta_schema_value_version = 1;
ASSERT_NO_FATAL_FAILURE(
create_tablet(meta_service.get(), 10001, 10002, 10003, 10005, next_rowset_id(), 2));
check_get_tablet(meta_service.get(), 10005, 2);
}
static void check_schema(MetaServiceProxy* meta_service, int64_t tablet_id,
int32_t schema_version) {
brpc::Controller cntl;
GetTabletRequest req;
GetTabletResponse res;
req.set_tablet_id(tablet_id);
meta_service->get_tablet(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << tablet_id;
ASSERT_TRUE(res.has_tablet_meta()) << tablet_id;
EXPECT_TRUE(res.tablet_meta().has_schema()) << tablet_id;
EXPECT_EQ(res.tablet_meta().schema_version(), schema_version) << tablet_id;
EXPECT_EQ(res.tablet_meta().schema().column_size(), 10) << tablet_id;
};
static void update_tablet(MetaServiceProxy* meta_service, int64_t tablet_id) {
brpc::Controller cntl;
UpdateTabletRequest req;
UpdateTabletResponse res;
auto meta_info = req.add_tablet_meta_infos();
meta_info->set_disable_auto_compaction(true);
meta_info->set_tablet_id(tablet_id);
meta_service->update_tablet(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << tablet_id;
}
TEST(AlterSchemaKVTest, AlterDisableAutoCompactionTest) {
//case 1 config::write_schema_kv = true;
{
auto meta_service = get_meta_service();
config::write_schema_kv = true;
//config::meta_schema_value_version = 0;
ASSERT_NO_FATAL_FAILURE(
create_tablet(meta_service.get(), 10001, 10002, 10003, 10004, next_rowset_id(), 0));
check_get_tablet(meta_service.get(), 10004, 0);
check_schema(meta_service.get(), 10004, 0);
//config::meta_schema_value_version = 1;
ASSERT_NO_FATAL_FAILURE(
create_tablet(meta_service.get(), 10001, 10002, 10003, 10005, next_rowset_id(), 2));
check_get_tablet(meta_service.get(), 10005, 2);
update_tablet(meta_service.get(), 10005);
check_schema(meta_service.get(), 10005, 2);
}
//case 2 config::write_schema_kv = false;
{
auto meta_service = get_meta_service();
config::write_schema_kv = false;
auto defer1 =
std::make_unique<std::function<void()>>([]() { config::write_schema_kv = true; });
//config::meta_schema_value_version = 0;
ASSERT_NO_FATAL_FAILURE(
create_tablet(meta_service.get(), 10001, 10002, 10003, 10004, next_rowset_id(), 0));
check_get_tablet(meta_service.get(), 10004, 0);
check_schema(meta_service.get(), 10004, 0);
//config::meta_schema_value_version = 1;
ASSERT_NO_FATAL_FAILURE(
create_tablet(meta_service.get(), 10001, 10002, 10003, 10005, next_rowset_id(), 2));
check_get_tablet(meta_service.get(), 10005, 2);
update_tablet(meta_service.get(), 10005);
check_schema(meta_service.get(), 10005, 2);
}
//case 3 config::write_schema_kv = false, create tablet, config::write_schema_kv = true;
{
auto meta_service = get_meta_service();
config::write_schema_kv = false;
auto defer1 =
std::make_unique<std::function<void()>>([]() { config::write_schema_kv = true; });
//config::meta_schema_value_version = 0;
ASSERT_NO_FATAL_FAILURE(
create_tablet(meta_service.get(), 10001, 10002, 10003, 10004, next_rowset_id(), 0));
check_get_tablet(meta_service.get(), 10004, 0);
check_schema(meta_service.get(), 10004, 0);
//config::meta_schema_value_version = 1;
ASSERT_NO_FATAL_FAILURE(
create_tablet(meta_service.get(), 10001, 10002, 10003, 10005, next_rowset_id(), 2));
check_get_tablet(meta_service.get(), 10005, 2);
config::write_schema_kv = true;
update_tablet(meta_service.get(), 10005);
check_schema(meta_service.get(), 10005, 2);
}
//case 4 config::write_schema_kv = false, create tablet, config::write_schema_kv = true;
// meta_schema_value_version = 0, meta_schema_value_version = 1
{
auto meta_service = get_meta_service();
config::write_schema_kv = false;
auto defer1 = std::make_unique<std::function<void()>>([]() {
config::write_schema_kv = true;
config::meta_schema_value_version = 1;
});
config::meta_schema_value_version = 0;
ASSERT_NO_FATAL_FAILURE(
create_tablet(meta_service.get(), 10001, 10002, 10003, 10004, next_rowset_id(), 0));
check_get_tablet(meta_service.get(), 10004, 0);
check_schema(meta_service.get(), 10004, 0);
config::meta_schema_value_version = 1;
ASSERT_NO_FATAL_FAILURE(
create_tablet(meta_service.get(), 10001, 10002, 10003, 10005, next_rowset_id(), 2));
check_get_tablet(meta_service.get(), 10005, 2);
config::write_schema_kv = true;
update_tablet(meta_service.get(), 10005);
check_schema(meta_service.get(), 10005, 2);
}
}
} // namespace doris::cloud