blob: 54733a29a65c94e8fa6dc4ecc68d851916863dde [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <brpc/controller.h>
#include <gen_cpp/cloud.pb.h>
#include <gen_cpp/olap_file.pb.h>
#include <google/protobuf/arena.h>
#include <gtest/gtest.h>
#include <random>
#include "common/config.h"
#include "common/defer.h"
#include "cpp/sync_point.h"
#include "meta-service/meta_service.h"
#include "meta-service/meta_service_schema.h"
#include "meta-store/blob_message.h"
#include "meta-store/codec.h"
#include "meta-store/document_message.h"
#include "meta-store/keys.h"
#include "meta-store/txn_kv.h"
#include "meta-store/txn_kv_error.h"
static std::string instance_id = "schema_kv_test";
namespace doris::cloud {
extern std::unique_ptr<MetaServiceProxy> get_meta_service();
static std::string next_rowset_id() {
static int cnt = 0;
return std::to_string(++cnt);
}
static void fill_schema(doris::TabletSchemaCloudPB* schema, int32_t schema_version) {
schema->set_schema_version(schema_version);
for (int i = 0; i < 10; ++i) {
auto column = schema->add_column();
column->set_unique_id(20000 + i);
column->set_type("INT");
}
}
static void add_tablet(CreateTabletsRequest& req, int64_t table_id, int64_t index_id,
int64_t partition_id, int64_t tablet_id, const std::string& rowset_id,
int32_t schema_version) {
auto tablet = req.add_tablet_metas();
tablet->set_table_id(table_id);
tablet->set_index_id(index_id);
tablet->set_partition_id(partition_id);
tablet->set_tablet_id(tablet_id);
auto schema = tablet->mutable_schema();
fill_schema(schema, schema_version);
auto first_rowset = tablet->add_rs_metas();
first_rowset->set_rowset_id(0); // required
first_rowset->set_rowset_id_v2(rowset_id);
first_rowset->set_start_version(0);
first_rowset->set_end_version(1);
first_rowset->mutable_tablet_schema()->CopyFrom(*schema);
}
static void create_tablet(MetaServiceProxy* meta_service, int64_t db_id, int64_t table_id,
int64_t index_id, int64_t partition_id, int64_t tablet_id,
const std::string& rowset_id, int32_t schema_version) {
brpc::Controller cntl;
CreateTabletsRequest req;
CreateTabletsResponse res;
req.set_db_id(db_id);
add_tablet(req, table_id, index_id, partition_id, tablet_id, rowset_id, schema_version);
meta_service->create_tablets(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << tablet_id;
}
static void get_rowset(MetaServiceProxy* meta_service, int64_t table_id, int64_t index_id,
int64_t partition_id, int64_t tablet_id, GetRowsetResponse& res) {
brpc::Controller cntl;
GetRowsetRequest req;
auto tablet_idx = req.mutable_idx();
tablet_idx->set_table_id(table_id);
tablet_idx->set_index_id(index_id);
tablet_idx->set_partition_id(partition_id);
tablet_idx->set_tablet_id(tablet_id);
req.set_start_version(0);
req.set_end_version(-1);
req.set_cumulative_compaction_cnt(0);
req.set_base_compaction_cnt(0);
req.set_cumulative_point(2);
meta_service->get_rowset(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << tablet_id;
}
static void check_get_tablet(MetaServiceProxy* meta_service, int64_t tablet_id,
int32_t schema_version) {
brpc::Controller cntl;
GetTabletRequest req;
GetTabletResponse res;
req.set_tablet_id(tablet_id);
meta_service->get_tablet(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << tablet_id;
ASSERT_TRUE(res.has_tablet_meta()) << tablet_id;
EXPECT_TRUE(res.tablet_meta().has_schema()) << tablet_id;
EXPECT_EQ(res.tablet_meta().schema_version(), schema_version) << tablet_id;
};
TEST(DetachSchemaKVTest, TabletTest) {
auto meta_service = get_meta_service();
// meta_service->resource_mgr().reset(); // Do not use resource manager
auto sp = SyncPoint::get_instance();
DORIS_CLOUD_DEFER {
SyncPoint::get_instance()->clear_all_call_backs();
};
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
ret->second = true;
});
sp->enable_processing();
// new MS write with write_schema_kv=false, old MS read
{
constexpr auto db_id = 1, table_id = 10001, index_id = 10002, partition_id = 10003,
tablet_id = 10004;
config::write_schema_kv = false;
ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), db_id, table_id, index_id,
partition_id, tablet_id, next_rowset_id(), 1));
// check saved values in txn_kv
std::unique_ptr<Transaction> txn;
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string tablet_key, tablet_val;
meta_tablet_key({instance_id, table_id, index_id, partition_id, tablet_id}, &tablet_key);
ASSERT_EQ(txn->get(tablet_key, &tablet_val), TxnErrorCode::TXN_OK);
doris::TabletMetaCloudPB saved_tablet;
ASSERT_TRUE(saved_tablet.ParseFromString(tablet_val));
EXPECT_TRUE(saved_tablet.has_schema());
std::string rowset_key, rowset_val;
meta_rowset_key({instance_id, tablet_id, 1}, &rowset_key);
ASSERT_EQ(txn->get(rowset_key, &rowset_val), TxnErrorCode::TXN_OK);
doris::RowsetMetaCloudPB saved_rowset;
ASSERT_TRUE(saved_rowset.ParseFromString(rowset_val));
EXPECT_TRUE(saved_rowset.has_tablet_schema());
}
// old MS write, new MS read
{
constexpr auto table_id = 10011, index_id = 10012, partition_id = 10013, tablet_id = 10014;
std::unique_ptr<Transaction> txn;
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
doris::TabletMetaCloudPB saved_tablet;
saved_tablet.set_table_id(table_id);
saved_tablet.set_index_id(index_id);
saved_tablet.set_partition_id(partition_id);
saved_tablet.set_tablet_id(tablet_id);
saved_tablet.mutable_schema()->set_schema_version(1);
auto column = saved_tablet.mutable_schema()->add_column();
column->set_unique_id(30001);
column->set_type("INT");
std::string tablet_key, tablet_val;
meta_tablet_key({instance_id, table_id, index_id, partition_id, tablet_id}, &tablet_key);
ASSERT_TRUE(saved_tablet.SerializeToString(&tablet_val));
txn->put(tablet_key, tablet_val);
TabletIndexPB saved_tablet_idx;
saved_tablet_idx.set_table_id(table_id);
saved_tablet_idx.set_index_id(index_id);
saved_tablet_idx.set_partition_id(partition_id);
saved_tablet_idx.set_tablet_id(tablet_id);
std::string tablet_idx_key, tablet_idx_val;
meta_tablet_idx_key({instance_id, tablet_id}, &tablet_idx_key);
ASSERT_TRUE(saved_tablet_idx.SerializeToString(&tablet_idx_val));
txn->put(tablet_idx_key, tablet_idx_val);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
// check get tablet response
check_get_tablet(meta_service.get(), tablet_id, 1);
}
auto check_new_saved_tablet_val = [](Transaction* txn, int64_t table_id, int64_t index_id,
int64_t partition_id, int64_t tablet_id,
int32_t schema_version) {
std::string tablet_key, tablet_val;
meta_tablet_key({instance_id, table_id, index_id, partition_id, tablet_id}, &tablet_key);
ASSERT_EQ(txn->get(tablet_key, &tablet_val), TxnErrorCode::TXN_OK);
doris::TabletMetaCloudPB saved_tablet;
ASSERT_TRUE(saved_tablet.ParseFromString(tablet_val));
EXPECT_FALSE(saved_tablet.has_schema()) << tablet_id;
EXPECT_EQ(saved_tablet.schema_version(), schema_version) << tablet_id;
};
// new MS write with write_schema_kv=true, new MS read
{
constexpr auto db_id = 1, table_id = 10021, index_id = 10022, partition_id = 10023,
tablet_id = 10024;
config::write_schema_kv = true;
ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), db_id, table_id, index_id,
partition_id, tablet_id, next_rowset_id(), 1));
// check saved values in txn_kv
std::unique_ptr<Transaction> txn;
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
check_new_saved_tablet_val(txn.get(), table_id, index_id, partition_id, tablet_id, 1);
std::string rowset_key, rowset_val;
meta_rowset_key({instance_id, tablet_id, 1}, &rowset_key);
ASSERT_EQ(txn->get(rowset_key, &rowset_val), TxnErrorCode::TXN_OK);
doris::RowsetMetaCloudPB saved_rowset;
ASSERT_TRUE(saved_rowset.ParseFromString(rowset_val));
EXPECT_FALSE(saved_rowset.has_tablet_schema());
EXPECT_EQ(saved_rowset.index_id(), index_id);
EXPECT_EQ(saved_rowset.schema_version(), 1);
// check get tablet response
check_get_tablet(meta_service.get(), tablet_id, 1);
// check get rowset response
GetRowsetResponse get_rowset_res;
get_rowset(meta_service.get(), table_id, index_id, partition_id, tablet_id, get_rowset_res);
ASSERT_EQ(get_rowset_res.rowset_meta_size(), 1);
EXPECT_TRUE(get_rowset_res.rowset_meta(0).has_tablet_schema());
EXPECT_EQ(get_rowset_res.rowset_meta(0).index_id(), index_id);
EXPECT_EQ(get_rowset_res.rowset_meta(0).schema_version(), 1);
ASSERT_TRUE(get_rowset_res.has_stats());
EXPECT_EQ(get_rowset_res.stats().num_rows(), 0);
EXPECT_EQ(get_rowset_res.stats().num_rowsets(), 1);
EXPECT_EQ(get_rowset_res.stats().num_segments(), 0);
EXPECT_EQ(get_rowset_res.stats().data_size(), 0);
EXPECT_EQ(get_rowset_res.stats().index_size(), 0);
EXPECT_EQ(get_rowset_res.stats().segment_size(), 0);
}
// new MS batch create tablets with write_schema_kv=true
{
config::write_schema_kv = true;
brpc::Controller cntl;
CreateTabletsRequest req;
CreateTabletsResponse res;
req.set_db_id(1);
add_tablet(req, 10031, 10032, 10033, 100031, next_rowset_id(), 1);
add_tablet(req, 10031, 10032, 10033, 100032, next_rowset_id(), 2);
add_tablet(req, 10031, 10032, 10033, 100033, next_rowset_id(), 2);
add_tablet(req, 10031, 10032, 10033, 100034, next_rowset_id(), 3);
add_tablet(req, 10031, 10032, 10033, 100035, next_rowset_id(), 3);
add_tablet(req, 10031, 10032, 10033, 100036, next_rowset_id(), 3);
add_tablet(req, 10031, 10034, 10033, 100037, next_rowset_id(), 1);
add_tablet(req, 10031, 10034, 10033, 100038, next_rowset_id(), 2);
add_tablet(req, 10031, 10034, 10033, 100039, next_rowset_id(), 2);
meta_service->create_tablets(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
// check saved values in txn_kv
std::unique_ptr<Transaction> txn;
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
check_new_saved_tablet_val(txn.get(), 10031, 10032, 10033, 100031, 1);
check_new_saved_tablet_val(txn.get(), 10031, 10032, 10033, 100032, 2);
check_new_saved_tablet_val(txn.get(), 10031, 10032, 10033, 100033, 2);
check_new_saved_tablet_val(txn.get(), 10031, 10032, 10033, 100034, 3);
check_new_saved_tablet_val(txn.get(), 10031, 10032, 10033, 100035, 3);
check_new_saved_tablet_val(txn.get(), 10031, 10032, 10033, 100036, 3);
check_new_saved_tablet_val(txn.get(), 10031, 10034, 10033, 100037, 1);
check_new_saved_tablet_val(txn.get(), 10031, 10034, 10033, 100038, 2);
check_new_saved_tablet_val(txn.get(), 10031, 10034, 10033, 100039, 2);
// check get tablet response
check_get_tablet(meta_service.get(), 100031, 1);
check_get_tablet(meta_service.get(), 100032, 2);
check_get_tablet(meta_service.get(), 100033, 2);
check_get_tablet(meta_service.get(), 100034, 3);
check_get_tablet(meta_service.get(), 100035, 3);
check_get_tablet(meta_service.get(), 100036, 3);
check_get_tablet(meta_service.get(), 100037, 1);
check_get_tablet(meta_service.get(), 100038, 2);
check_get_tablet(meta_service.get(), 100039, 2);
}
}
TEST(DetachSchemaKVTest, PutSchemaKvTest) {
config::meta_schema_value_version = 1;
auto meta_service = get_meta_service();
int64_t index_id = 14221;
int64_t schema_version = 0;
std::string key = meta_schema_key({instance_id, index_id, schema_version});
std::string versioned_key = versioned::meta_schema_key({instance_id, index_id, schema_version});
doris::TabletSchemaCloudPB schema;
fill_schema(&schema, schema_version);
std::unique_ptr<Transaction> txn;
MetaServiceCode code = MetaServiceCode::OK;
std::string msg;
for (int i = 0; i < 2; ++i) {
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
put_schema_kv(code, msg, txn.get(), key, schema);
ASSERT_EQ(code, MetaServiceCode::OK);
put_versioned_schema_kv(code, msg, txn.get(), versioned_key, schema);
ASSERT_EQ(code, MetaServiceCode::OK);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
// verify the tablet schema is written
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
doris::TabletSchemaCloudPB saved_schema;
ValueBuf buf;
ASSERT_EQ(cloud::blob_get(txn.get(), key, &buf), TxnErrorCode::TXN_OK);
// verify the versioned tablet schema is written
ASSERT_EQ(document_get(txn.get(), versioned_key, &saved_schema), TxnErrorCode::TXN_OK);
EXPECT_EQ(saved_schema.schema_version(), schema_version);
}
{
// put new schema version
schema_version = 1;
schema.set_schema_version(schema_version);
key = meta_schema_key({instance_id, index_id, schema_version});
versioned_key = versioned::meta_schema_key({instance_id, index_id, schema_version});
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
put_schema_kv(code, msg, txn.get(), key, schema);
ASSERT_EQ(code, MetaServiceCode::OK);
put_versioned_schema_kv(code, msg, txn.get(), versioned_key, schema);
ASSERT_EQ(code, MetaServiceCode::OK);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
// verify the tablet schema is written
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
doris::TabletSchemaCloudPB saved_schema;
ValueBuf buf;
ASSERT_EQ(cloud::blob_get(txn.get(), key, &buf), TxnErrorCode::TXN_OK);
// verify the versioned tablet schema is written
ASSERT_EQ(document_get(txn.get(), versioned_key, &saved_schema), TxnErrorCode::TXN_OK);
EXPECT_EQ(saved_schema.schema_version(), schema_version);
}
}
static void begin_txn(MetaServiceProxy* meta_service, int64_t db_id, const std::string& label,
int64_t table_id, int64_t& txn_id) {
brpc::Controller cntl;
BeginTxnRequest req;
BeginTxnResponse res;
auto txn_info = req.mutable_txn_info();
txn_info->set_db_id(db_id);
txn_info->set_label(label);
txn_info->add_table_ids(table_id);
txn_info->set_timeout_ms(36000);
meta_service->begin_txn(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;
ASSERT_TRUE(res.has_txn_id()) << label;
txn_id = res.txn_id();
}
static void commit_txn(MetaServiceProxy* meta_service, int64_t db_id, int64_t txn_id,
const std::string& label) {
brpc::Controller cntl;
CommitTxnRequest req;
CommitTxnResponse res;
req.set_db_id(db_id);
req.set_txn_id(txn_id);
meta_service->commit_txn(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;
}
static doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, int64_t tablet_id,
const std::string& rowset_id, int32_t schema_version,
int64_t version = -1,
const TabletSchemaCloudPB* schema = nullptr) {
doris::RowsetMetaCloudPB rowset;
rowset.set_rowset_id(0); // required
rowset.set_rowset_id_v2(rowset_id);
rowset.set_tablet_id(tablet_id);
rowset.set_txn_id(txn_id);
rowset.set_num_rows(100);
rowset.set_num_segments(1);
rowset.set_data_disk_size(10000);
rowset.set_index_disk_size(1000);
rowset.set_total_disk_size(11000);
if (version > 0) {
rowset.set_start_version(version);
rowset.set_end_version(version);
}
rowset.mutable_tablet_schema()->set_schema_version(schema_version);
rowset.set_txn_expiration(::time(nullptr)); // Required by DCHECK
if (schema != nullptr) {
rowset.mutable_tablet_schema()->CopyFrom(*schema);
rowset.mutable_tablet_schema()->set_schema_version(schema_version);
}
return rowset;
}
static void prepare_rowset(MetaServiceProxy* meta_service, const doris::RowsetMetaCloudPB& rowset,
CreateRowsetResponse& res) {
brpc::Controller cntl;
auto arena = res.GetArena();
auto req = google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena);
req->mutable_rowset_meta()->CopyFrom(rowset);
meta_service->prepare_rowset(&cntl, req, &res, nullptr);
if (!arena) delete req;
}
static void commit_rowset(MetaServiceProxy* meta_service, const doris::RowsetMetaCloudPB& rowset,
CreateRowsetResponse& res) {
brpc::Controller cntl;
auto arena = res.GetArena();
auto req = google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena);
req->mutable_rowset_meta()->CopyFrom(rowset);
meta_service->commit_rowset(&cntl, req, &res, nullptr);
if (!arena) delete req;
}
static void insert_rowset(MetaServiceProxy* meta_service, int64_t db_id, const std::string& label,
int64_t table_id, int64_t tablet_id, int32_t schema_version,
const TabletSchemaCloudPB* schema = nullptr) {
int64_t txn_id = 0;
ASSERT_NO_FATAL_FAILURE(begin_txn(meta_service, db_id, label, table_id, txn_id));
CreateRowsetResponse res;
auto rowset = create_rowset(txn_id, tablet_id, next_rowset_id(), schema_version, -1, schema);
rowset.set_has_variant_type_in_schema(schema != nullptr);
prepare_rowset(meta_service, rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;
res.Clear();
ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service, rowset, res));
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label << ", msg=" << res.status().msg();
commit_txn(meta_service, db_id, txn_id, label);
}
static TabletSchemaCloudPB getVariantSchema() {
TabletSchemaCloudPB schema;
schema.set_schema_version(10086);
// columns
ColumnPB var;
var.set_type("VARIANT");
var.set_unique_id(10);
ColumnPB var_sub1;
var_sub1.set_type("INT");
var_sub1.set_unique_id(-1);
schema.add_column()->CopyFrom(var_sub1);
ColumnPB var_sub2;
var_sub2.set_type("DOUBLE");
var_sub2.set_unique_id(-1);
schema.add_column()->CopyFrom(var_sub2);
ColumnPB var_sparse_sub1;
var_sparse_sub1.set_type("DOUBLE");
var_sparse_sub1.set_unique_id(-1);
var.add_sparse_columns()->CopyFrom(var_sparse_sub1);
schema.add_column()->CopyFrom(var);
// indexes
doris::TabletIndexPB index1;
index1.set_index_id(111);
index1.set_index_suffix_name("aaabbbccc");
schema.add_index()->CopyFrom(index1);
doris::TabletIndexPB index2;
index2.set_index_id(222);
schema.add_index()->CopyFrom(index2);
return schema;
}
TEST(DetachSchemaKVTest, RowsetTest) {
auto meta_service = get_meta_service();
// meta_service->resource_mgr().reset(); // Do not use resource manager
auto sp = SyncPoint::get_instance();
DORIS_CLOUD_DEFER {
SyncPoint::get_instance()->clear_all_call_backs();
};
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
ret->second = true;
});
sp->enable_processing();
constexpr int64_t db_id = 10000;
// new MS write with write_schema_kv=false, old MS read
{
constexpr auto table_id = 10001, index_id = 10002, partition_id = 10003, tablet_id = 10004;
config::write_schema_kv = false;
ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), db_id, table_id, index_id,
partition_id, tablet_id, next_rowset_id(), 1));
ASSERT_NO_FATAL_FAILURE(
insert_rowset(meta_service.get(), db_id, "101", table_id, tablet_id, 2)); // [2-2]
// check saved values in txn_kv
std::unique_ptr<Transaction> txn;
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string rowset_key, rowset_val;
meta_rowset_key({instance_id, tablet_id, 2}, &rowset_key); // [2-2]
ASSERT_EQ(txn->get(rowset_key, &rowset_val), TxnErrorCode::TXN_OK);
doris::RowsetMetaCloudPB saved_rowset;
ASSERT_TRUE(saved_rowset.ParseFromString(rowset_val));
ASSERT_TRUE(saved_rowset.has_tablet_schema());
EXPECT_EQ(saved_rowset.tablet_schema().schema_version(), 2);
}
// old MS write, new MS read
{
constexpr auto table_id = 10011, index_id = 10012, partition_id = 10013, tablet_id = 10014;
config::write_schema_kv = false;
ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), db_id, table_id, index_id,
partition_id, tablet_id, next_rowset_id(), 1));
std::unique_ptr<Transaction> txn;
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
auto saved_rowset = create_rowset(10015, tablet_id, next_rowset_id(), 2, 2);
std::string rowset_key, rowset_val;
meta_rowset_key({instance_id, tablet_id, 2}, &rowset_key); // version=[2-2]
ASSERT_TRUE(saved_rowset.SerializeToString(&rowset_val));
txn->put(rowset_key, rowset_val);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
// check get rowset response
GetRowsetResponse get_rowset_res;
get_rowset(meta_service.get(), table_id, index_id, partition_id, tablet_id, get_rowset_res);
ASSERT_EQ(get_rowset_res.rowset_meta_size(), 2);
ASSERT_EQ(get_rowset_res.rowset_meta(0).end_version(), 1); // [0-1]
ASSERT_TRUE(get_rowset_res.rowset_meta(0).has_tablet_schema());
EXPECT_EQ(get_rowset_res.rowset_meta(0).tablet_schema().schema_version(), 1);
EXPECT_EQ(get_rowset_res.rowset_meta(0).index_id(), index_id);
EXPECT_EQ(get_rowset_res.rowset_meta(0).schema_version(), 1);
ASSERT_EQ(get_rowset_res.rowset_meta(1).end_version(), 2); // [2-2]
ASSERT_TRUE(get_rowset_res.rowset_meta(1).has_tablet_schema());
EXPECT_EQ(get_rowset_res.rowset_meta(1).tablet_schema().schema_version(), 2);
EXPECT_EQ(get_rowset_res.rowset_meta(1).index_id(), index_id);
EXPECT_EQ(get_rowset_res.rowset_meta(1).schema_version(), 2);
}
// new MS write with write_schema_kv=true, new MS read
{
constexpr auto table_id = 10021, index_id = 10022, partition_id = 10023, tablet_id = 10024;
config::write_schema_kv = true;
ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), db_id, table_id, index_id,
partition_id, tablet_id, next_rowset_id(), 1));
ASSERT_NO_FATAL_FAILURE(
insert_rowset(meta_service.get(), db_id, "201", table_id, tablet_id, 2)); // [2-2]
// check saved values in txn_kv
std::unique_ptr<Transaction> txn;
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string rowset_key, rowset_val;
meta_rowset_key({instance_id, 10024, 2}, &rowset_key); // [2-2]
ASSERT_EQ(txn->get(rowset_key, &rowset_val), TxnErrorCode::TXN_OK);
doris::RowsetMetaCloudPB saved_rowset;
ASSERT_TRUE(saved_rowset.ParseFromString(rowset_val));
EXPECT_FALSE(saved_rowset.has_tablet_schema());
EXPECT_EQ(saved_rowset.index_id(), index_id);
EXPECT_EQ(saved_rowset.schema_version(), 2);
// check get rowset response
GetRowsetResponse get_rowset_res;
get_rowset(meta_service.get(), table_id, index_id, partition_id, tablet_id, get_rowset_res);
ASSERT_EQ(get_rowset_res.rowset_meta_size(), 2);
ASSERT_EQ(get_rowset_res.rowset_meta(0).end_version(), 1); // [0-1]
ASSERT_TRUE(get_rowset_res.rowset_meta(0).has_tablet_schema());
EXPECT_EQ(get_rowset_res.rowset_meta(0).tablet_schema().schema_version(), 1);
EXPECT_EQ(get_rowset_res.rowset_meta(0).index_id(), index_id);
EXPECT_EQ(get_rowset_res.rowset_meta(0).schema_version(), 1);
ASSERT_EQ(get_rowset_res.rowset_meta(1).end_version(), 2); // [2-2]
ASSERT_TRUE(get_rowset_res.rowset_meta(1).has_tablet_schema());
EXPECT_EQ(get_rowset_res.rowset_meta(1).tablet_schema().schema_version(), 2);
EXPECT_EQ(get_rowset_res.rowset_meta(1).index_id(), index_id);
EXPECT_EQ(get_rowset_res.rowset_meta(1).schema_version(), 2);
ASSERT_TRUE(get_rowset_res.has_stats());
EXPECT_EQ(get_rowset_res.stats().num_rows(), 100);
EXPECT_EQ(get_rowset_res.stats().num_rowsets(), 2);
EXPECT_EQ(get_rowset_res.stats().num_segments(), 1);
EXPECT_EQ(get_rowset_res.stats().data_size(), 11000);
EXPECT_EQ(get_rowset_res.stats().index_size(), 1000);
EXPECT_EQ(get_rowset_res.stats().segment_size(), 10000);
}
// new MS read rowsets committed by both old and new MS
auto insert_and_get_rowset = [&meta_service](int64_t table_id, int64_t index_id,
int64_t partition_id, int64_t tablet_id,
int label_base,
google::protobuf::Arena* arena = nullptr,
const TabletSchemaCloudPB* schema = nullptr) {
config::write_schema_kv = false;
std::mt19937 rng(std::chrono::system_clock::now().time_since_epoch().count());
std::uniform_int_distribution<int> dist1(1, 4);
std::uniform_int_distribution<int> dist2(2, 7);
std::vector<int> schema_versions {1};
ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), db_id, table_id, index_id,
partition_id, tablet_id, next_rowset_id(),
schema_versions[0]));
for (int i = 0; i < 10; ++i) {
schema_versions.push_back(dist1(rng));
ASSERT_NO_FATAL_FAILURE(insert_rowset(meta_service.get(), db_id,
std::to_string(++label_base), table_id, tablet_id,
schema_versions.back(), schema));
}
config::write_schema_kv = true;
for (int i = 0; i < 15; ++i) {
schema_versions.push_back(dist2(rng));
ASSERT_NO_FATAL_FAILURE(insert_rowset(meta_service.get(), db_id,
std::to_string(++label_base), table_id, tablet_id,
schema_versions.back(), schema));
}
// check get rowset response
auto get_rowset_res = google::protobuf::Arena::CreateMessage<GetRowsetResponse>(arena);
DORIS_CLOUD_DEFER {
if (!arena) delete get_rowset_res;
};
get_rowset(meta_service.get(), table_id, index_id, partition_id, tablet_id,
*get_rowset_res);
ASSERT_EQ(get_rowset_res->rowset_meta_size(), schema_versions.size());
for (int i = 0; i < schema_versions.size(); ++i) {
auto& rowset = get_rowset_res->rowset_meta(i);
ASSERT_EQ(rowset.end_version(), i + 1);
ASSERT_TRUE(rowset.has_tablet_schema());
EXPECT_EQ(rowset.tablet_schema().schema_version(), schema_versions[i]);
EXPECT_EQ(rowset.index_id(), index_id);
EXPECT_EQ(rowset.schema_version(), schema_versions[i]);
}
ASSERT_TRUE(get_rowset_res->has_stats());
EXPECT_EQ(get_rowset_res->stats().num_rows(), 2500);
EXPECT_EQ(get_rowset_res->stats().num_rowsets(), 26);
EXPECT_EQ(get_rowset_res->stats().num_segments(), 25);
EXPECT_EQ(get_rowset_res->stats().data_size(), 275000);
EXPECT_EQ(get_rowset_res->stats().index_size(), 25000);
EXPECT_EQ(get_rowset_res->stats().segment_size(), 250000);
if (schema != nullptr) {
auto schema_version = get_rowset_res->rowset_meta(10).schema_version();
get_rowset_res->mutable_rowset_meta(10)->mutable_tablet_schema()->set_schema_version(
10086);
std::cout << get_rowset_res->rowset_meta(10).tablet_schema().ShortDebugString()
<< std::endl;
std::cout << schema->ShortDebugString() << std::endl;
EXPECT_EQ(get_rowset_res->rowset_meta(10).tablet_schema().column(2).type(),
schema->column(2).type());
EXPECT_EQ(get_rowset_res->rowset_meta(10).tablet_schema().index(0).index_suffix_name(),
schema->index(0).index_suffix_name());
EXPECT_EQ(get_rowset_res->rowset_meta(10).tablet_schema().index(1).index_id(),
schema->index(1).index_id());
get_rowset_res->mutable_rowset_meta(10)->mutable_tablet_schema()->set_schema_version(
schema_version);
}
};
insert_and_get_rowset(10031, 10032, 10033, 10034, 300);
// use arena
google::protobuf::Arena arena;
insert_and_get_rowset(10041, 10042, 10043, 10044, 400, &arena);
TabletSchemaCloudPB schema = getVariantSchema();
insert_and_get_rowset(10051, 10052, 10053, 10054, 500, &arena, &schema);
}
TEST(DetachSchemaKVTest, InsertExistedRowsetTest) {
auto meta_service = get_meta_service();
// meta_service->resource_mgr().reset(); // Do not use resource manager
auto sp = SyncPoint::get_instance();
DORIS_CLOUD_DEFER {
SyncPoint::get_instance()->clear_all_call_backs();
};
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
ret->second = true;
});
sp->enable_processing();
int64_t db_id = 1000;
// old MS commit rowset, new MS commit rowset again
{
constexpr auto table_id = 10001, index_id = 10002, partition_id = 10003, tablet_id = 10004;
config::write_schema_kv = false;
ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), db_id, table_id, index_id,
partition_id, tablet_id, next_rowset_id(), 1));
std::unique_ptr<Transaction> txn;
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
auto committed_rowset = create_rowset(10005, tablet_id, next_rowset_id(), 2, 2);
std::string tmp_rowset_key, tmp_rowset_val;
// 0:instance_id 1:txn_id 2:tablet_id
meta_rowset_tmp_key({instance_id, 10005, tablet_id}, &tmp_rowset_key);
ASSERT_TRUE(committed_rowset.SerializeToString(&tmp_rowset_val));
txn->put(tmp_rowset_key, tmp_rowset_val);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
CreateRowsetResponse res;
auto new_rowset = create_rowset(10005, tablet_id, next_rowset_id(), 2, 2);
prepare_rowset(meta_service.get(), new_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::ALREADY_EXISTED);
ASSERT_TRUE(res.has_existed_rowset_meta());
EXPECT_EQ(res.existed_rowset_meta().rowset_id_v2(), committed_rowset.rowset_id_v2());
EXPECT_EQ(res.existed_rowset_meta().index_id(), index_id);
EXPECT_EQ(res.existed_rowset_meta().schema_version(), 2);
ASSERT_TRUE(res.existed_rowset_meta().has_tablet_schema());
EXPECT_EQ(res.existed_rowset_meta().tablet_schema().schema_version(), 2);
res.Clear();
commit_rowset(meta_service.get(), new_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::ALREADY_EXISTED);
ASSERT_TRUE(res.has_existed_rowset_meta());
EXPECT_EQ(res.existed_rowset_meta().rowset_id_v2(), committed_rowset.rowset_id_v2());
EXPECT_EQ(res.existed_rowset_meta().index_id(), index_id);
EXPECT_EQ(res.existed_rowset_meta().schema_version(), 2);
ASSERT_TRUE(res.existed_rowset_meta().has_tablet_schema());
EXPECT_EQ(res.existed_rowset_meta().tablet_schema().schema_version(), 2);
}
// new MS commit rowset, new MS commit rowset again
auto insert_existed_rowset = [&meta_service, db_id](int64_t table_id, int64_t index_id,
int64_t partition_id, int64_t tablet_id,
int64_t txn_id,
google::protobuf::Arena* arena = nullptr) {
config::write_schema_kv = true;
ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), db_id, table_id, index_id,
partition_id, tablet_id, next_rowset_id(), 1));
auto committed_rowset = create_rowset(txn_id, tablet_id, next_rowset_id(), 2, 2);
auto res = google::protobuf::Arena::CreateMessage<CreateRowsetResponse>(arena);
DORIS_CLOUD_DEFER {
if (!arena) delete res;
};
prepare_rowset(meta_service.get(), committed_rowset, *res);
ASSERT_EQ(res->status().code(), MetaServiceCode::OK);
res->Clear();
commit_rowset(meta_service.get(), committed_rowset, *res);
ASSERT_EQ(res->status().code(), MetaServiceCode::OK);
res->Clear();
auto new_rowset = create_rowset(txn_id, tablet_id, next_rowset_id(), 2, 2);
prepare_rowset(meta_service.get(), new_rowset, *res);
ASSERT_EQ(res->status().code(), MetaServiceCode::ALREADY_EXISTED);
ASSERT_TRUE(res->has_existed_rowset_meta());
EXPECT_EQ(res->existed_rowset_meta().rowset_id_v2(), committed_rowset.rowset_id_v2());
EXPECT_EQ(res->existed_rowset_meta().index_id(), index_id);
EXPECT_EQ(res->existed_rowset_meta().schema_version(), 2);
ASSERT_TRUE(res->existed_rowset_meta().has_tablet_schema());
EXPECT_EQ(res->existed_rowset_meta().tablet_schema().schema_version(), 2);
res->Clear();
commit_rowset(meta_service.get(), new_rowset, *res);
ASSERT_EQ(res->status().code(), MetaServiceCode::ALREADY_EXISTED);
ASSERT_TRUE(res->has_existed_rowset_meta());
EXPECT_EQ(res->existed_rowset_meta().rowset_id_v2(), committed_rowset.rowset_id_v2());
EXPECT_EQ(res->existed_rowset_meta().index_id(), index_id);
EXPECT_EQ(res->existed_rowset_meta().schema_version(), 2);
ASSERT_TRUE(res->existed_rowset_meta().has_tablet_schema());
EXPECT_EQ(res->existed_rowset_meta().tablet_schema().schema_version(), 2);
};
insert_existed_rowset(10011, 10012, 10013, 10014, 10015);
google::protobuf::Arena arena;
insert_existed_rowset(10021, 10022, 10023, 10024, 10025, &arena);
}
TEST(SchemaKVTest, InsertExistedRowsetTest) {
auto meta_service = get_meta_service();
// meta_service->resource_mgr().reset(); // Do not use resource manager
auto sp = SyncPoint::get_instance();
DORIS_CLOUD_DEFER {
SyncPoint::get_instance()->clear_all_call_backs();
};
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
ret->second = true;
});
sp->enable_processing();
constexpr int64_t db_id = 2000;
config::write_schema_kv = true;
config::meta_schema_value_version = 0;
ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), db_id, 10001, 10002, 10003, 10004,
next_rowset_id(), 1));
check_get_tablet(meta_service.get(), 10004, 1);
config::meta_schema_value_version = 1;
ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), db_id, 10001, 10002, 10003, 10005,
next_rowset_id(), 2));
check_get_tablet(meta_service.get(), 10005, 2);
}
static void check_schema(MetaServiceProxy* meta_service, int64_t tablet_id, int32_t schema_version,
bool expected_disable_auto_compaction = false) {
brpc::Controller cntl;
GetTabletRequest req;
GetTabletResponse res;
req.set_tablet_id(tablet_id);
meta_service->get_tablet(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << tablet_id;
ASSERT_TRUE(res.has_tablet_meta()) << tablet_id;
EXPECT_TRUE(res.tablet_meta().has_schema()) << tablet_id;
EXPECT_EQ(res.tablet_meta().schema_version(), schema_version) << tablet_id;
EXPECT_EQ(res.tablet_meta().schema().column_size(), 10) << tablet_id;
EXPECT_EQ(res.tablet_meta().schema().disable_auto_compaction(),
expected_disable_auto_compaction)
<< tablet_id;
};
static void update_tablet(MetaServiceProxy* meta_service, int64_t tablet_id) {
brpc::Controller cntl;
UpdateTabletRequest req;
UpdateTabletResponse res;
auto meta_info = req.add_tablet_meta_infos();
meta_info->set_disable_auto_compaction(true);
meta_info->set_tablet_id(tablet_id);
meta_service->update_tablet(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << tablet_id;
}
TEST(AlterSchemaKVTest, AlterDisableAutoCompactionTest) {
//case 1 config::write_schema_kv = true;
constexpr int64_t db_id = 3000;
{
auto meta_service = get_meta_service();
config::write_schema_kv = true;
//config::meta_schema_value_version = 0;
ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), db_id, 10001, 10002, 10003, 10004,
next_rowset_id(), 0));
check_get_tablet(meta_service.get(), 10004, 0);
check_schema(meta_service.get(), 10004, 0);
//config::meta_schema_value_version = 1;
ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), db_id, 10001, 10002, 10003, 10005,
next_rowset_id(), 2));
check_get_tablet(meta_service.get(), 10005, 2);
update_tablet(meta_service.get(), 10005);
check_schema(meta_service.get(), 10005, 2, true);
}
//case 2 config::write_schema_kv = false;
{
auto meta_service = get_meta_service();
config::write_schema_kv = false;
auto defer1 =
std::make_unique<std::function<void()>>([]() { config::write_schema_kv = true; });
//config::meta_schema_value_version = 0;
ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), db_id, 10001, 10002, 10003, 10004,
next_rowset_id(), 0));
check_get_tablet(meta_service.get(), 10004, 0);
check_schema(meta_service.get(), 10004, 0);
//config::meta_schema_value_version = 1;
ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), db_id, 10001, 10002, 10003, 10005,
next_rowset_id(), 2));
check_get_tablet(meta_service.get(), 10005, 2);
update_tablet(meta_service.get(), 10005);
check_schema(meta_service.get(), 10005, 2, true);
}
//case 3 config::write_schema_kv = false, create tablet, config::write_schema_kv = true;
{
auto meta_service = get_meta_service();
config::write_schema_kv = false;
auto defer1 =
std::make_unique<std::function<void()>>([]() { config::write_schema_kv = true; });
//config::meta_schema_value_version = 0;
ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), db_id, 10001, 10002, 10003, 10004,
next_rowset_id(), 0));
check_get_tablet(meta_service.get(), 10004, 0);
check_schema(meta_service.get(), 10004, 0);
//config::meta_schema_value_version = 1;
ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), db_id, 10001, 10002, 10003, 10005,
next_rowset_id(), 2));
check_get_tablet(meta_service.get(), 10005, 2);
config::write_schema_kv = true;
update_tablet(meta_service.get(), 10005);
check_schema(meta_service.get(), 10005, 2, true);
}
//case 4 config::write_schema_kv = false, create tablet, config::write_schema_kv = true;
// meta_schema_value_version = 0, meta_schema_value_version = 1
{
auto meta_service = get_meta_service();
config::write_schema_kv = false;
auto defer1 = std::make_unique<std::function<void()>>([]() {
config::write_schema_kv = true;
config::meta_schema_value_version = 1;
});
config::meta_schema_value_version = 0;
ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), db_id, 10001, 10002, 10003, 10004,
next_rowset_id(), 0));
check_get_tablet(meta_service.get(), 10004, 0);
check_schema(meta_service.get(), 10004, 0);
config::meta_schema_value_version = 1;
ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), db_id, 10001, 10002, 10003, 10005,
next_rowset_id(), 2));
check_get_tablet(meta_service.get(), 10005, 2);
config::write_schema_kv = true;
update_tablet(meta_service.get(), 10005);
check_schema(meta_service.get(), 10005, 2, true);
}
}
// Helper: count the number of blob chunks for a given base key.
static int count_blob_chunks(Transaction* txn, const std::string& base_key) {
std::string begin_key = base_key;
std::string end_key = base_key;
encode_int64(INT64_MAX, &end_key);
std::unique_ptr<RangeGetIterator> iter;
auto err = txn->get(begin_key, end_key, &iter);
EXPECT_EQ(err, TxnErrorCode::TXN_OK);
if (err != TxnErrorCode::TXN_OK) return -1;
int count = 0;
while (iter->has_next()) {
iter->next();
++count;
}
return count;
}
// Verify that ValueBuf::remove() before blob_put is necessary to clean up
// orphaned blob chunks when the number of chunks decreases.
TEST(AlterSchemaKVTest, BlobRemoveBeforeOverwriteTest) {
auto meta_service = get_meta_service();
auto sp = SyncPoint::get_instance();
DORIS_CLOUD_DEFER {
sp->clear_all_call_backs();
sp->disable_processing();
};
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
ret->second = true;
});
sp->enable_processing();
constexpr int64_t db_id = 4000;
constexpr int64_t table_id = 40001;
constexpr int64_t index_id = 40002;
constexpr int64_t partition_id = 40003;
constexpr int64_t tablet_id = 40004;
constexpr int32_t schema_version = 0;
config::write_schema_kv = true;
config::meta_schema_value_version = 1;
// Create a tablet with a normal 10-column schema.
ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), db_id, table_id, index_id,
partition_id, tablet_id, next_rowset_id(),
schema_version));
check_get_tablet(meta_service.get(), tablet_id, schema_version);
// Compute the schema KV key.
auto schema_key = meta_schema_key({instance_id, index_id, schema_version});
// Overwrite the schema KV with a small split_size to create 3 blob chunks.
// The normal schema (~120 bytes) with split_size=50 yields 3 chunks.
{
std::unique_ptr<Transaction> txn;
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
// First remove existing blob chunks
ValueBuf old_val;
ASSERT_EQ(cloud::blob_get(txn.get(), schema_key, &old_val), TxnErrorCode::TXN_OK);
old_val.remove(txn.get());
// Write with small split_size to create 3 chunks
doris::TabletSchemaCloudPB schema;
ASSERT_TRUE(old_val.to_pb(&schema));
std::string serialized = schema.SerializeAsString();
size_t split_size = (serialized.size() / 3) + 1; // ensures exactly 3 chunks
cloud::blob_put(txn.get(), schema_key, schema, config::meta_schema_value_version,
split_size);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
// Verify we now have 3 blob chunks.
{
std::unique_ptr<Transaction> txn;
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
EXPECT_EQ(count_blob_chunks(txn.get(), schema_key), 3);
}
// Case 1: update_tablet WITH remove (default behavior).
// The remove cleans up old 3 chunks, then blob_put writes 1 chunk
// (with default split_size=90KB, the ~120 byte schema fits in 1 chunk).
update_tablet(meta_service.get(), tablet_id);
{
std::unique_ptr<Transaction> txn;
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
EXPECT_EQ(count_blob_chunks(txn.get(), schema_key), 1)
<< "With remove: should have 1 chunk (old 3 cleaned up)";
// Verify schema is correct
ValueBuf val;
ASSERT_EQ(cloud::blob_get(txn.get(), schema_key, &val), TxnErrorCode::TXN_OK);
doris::TabletSchemaCloudPB schema;
ASSERT_TRUE(val.to_pb(&schema));
EXPECT_TRUE(schema.disable_auto_compaction());
}
check_schema(meta_service.get(), tablet_id, schema_version, true);
// Reset: overwrite schema KV again with 3 blob chunks and
// disable_auto_compaction=false so update_tablet will modify it again.
{
std::unique_ptr<Transaction> txn;
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
ValueBuf old_val;
ASSERT_EQ(cloud::blob_get(txn.get(), schema_key, &old_val), TxnErrorCode::TXN_OK);
old_val.remove(txn.get());
doris::TabletSchemaCloudPB schema;
ASSERT_TRUE(old_val.to_pb(&schema));
schema.set_disable_auto_compaction(false);
std::string serialized = schema.SerializeAsString();
size_t split_size = (serialized.size() / 3) + 1;
cloud::blob_put(txn.get(), schema_key, schema, config::meta_schema_value_version,
split_size);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
{
std::unique_ptr<Transaction> txn;
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
EXPECT_EQ(count_blob_chunks(txn.get(), schema_key), 3);
}
// Case 2: update_tablet WITHOUT remove (inject skip via sync point).
// This simulates what would happen if we didn't have the remove logic.
sp->set_call_back("update_tablet::skip_schema_remove",
[](auto&& args) { *try_any_cast<bool*>(args[0]) = true; });
update_tablet(meta_service.get(), tablet_id);
sp->clear_call_back("update_tablet::skip_schema_remove");
{
std::unique_ptr<Transaction> txn;
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
int chunks = count_blob_chunks(txn.get(), schema_key);
EXPECT_EQ(chunks, 3) << "Without remove: should have 3 chunks (2 orphaned + 1 overwritten)";
// blob_get reads all 3 chunks and concatenates them, producing
// corrupted data that cannot be parsed correctly.
ValueBuf val;
ASSERT_EQ(cloud::blob_get(txn.get(), schema_key, &val), TxnErrorCode::TXN_OK);
doris::TabletSchemaCloudPB schema;
// The deserialized schema may parse but contain garbage from
// orphaned chunks, or the column count will be wrong.
if (val.to_pb(&schema)) {
// If it parses, the data is corrupted - the column count or
// content won't match the expected schema.
LOG(INFO) << "Without remove: parsed schema has " << schema.column_size()
<< " columns (expected 10), disable_auto_compaction="
<< schema.disable_auto_compaction();
} else {
LOG(INFO) << "Without remove: schema parse failed as expected (corrupted data)";
}
}
}
} // namespace doris::cloud