blob: aa9c2fd13cdc3ea702ac27567a47285c00d614f7 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <gmock/gmock-more-matchers.h>
#include <gtest/gtest.h>
#include "common/consts.h"
#include "exec/common/variant_util.h"
#include "storage/rowset/beta_rowset_writer.h"
#include "storage/rowset/rowset_factory.h"
#include "storage/segment/segment_loader.h"
#include "storage/segment/variant/variant_column_writer_impl.h"
#include "storage/storage_engine.h"
#include "storage/tablet/tablet_schema.h"
using namespace doris;
using namespace doris::segment_v2;
using namespace doris;
constexpr static uint32_t MAX_PATH_LEN = 1024;
constexpr static std::string_view dest_dir = "/ut_dir/schema_util_test";
constexpr static std::string_view tmp_dir = "./ut_dir/tmp";
class SchemaUtilRowsetTest : public testing::Test {
protected:
void SetUp() override {
// absolute dir
char buffer[MAX_PATH_LEN];
EXPECT_NE(getcwd(buffer, MAX_PATH_LEN), nullptr);
_curreent_dir = std::string(buffer);
_absolute_dir = _curreent_dir + std::string(dest_dir);
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_absolute_dir).ok());
EXPECT_TRUE(io::global_local_filesystem()->create_directory(_absolute_dir).ok());
// tmp dir
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(tmp_dir).ok());
EXPECT_TRUE(io::global_local_filesystem()->create_directory(tmp_dir).ok());
std::vector<StorePath> paths;
paths.emplace_back(std::string(tmp_dir), 1024000000);
auto tmp_file_dirs = std::make_unique<segment_v2::TmpFileDirs>(paths);
EXPECT_TRUE(tmp_file_dirs->init().ok());
ExecEnv::GetInstance()->set_tmp_file_dir(std::move(tmp_file_dirs));
// storage engine
doris::EngineOptions options;
auto engine = std::make_unique<StorageEngine>(options);
_engine_ref = engine.get();
_data_dir = std::make_unique<DataDir>(*_engine_ref, _absolute_dir);
static_cast<void>(_data_dir->update_capacity());
EXPECT_TRUE(_data_dir->init(true).ok());
ExecEnv::GetInstance()->set_storage_engine(std::move(engine));
}
void TearDown() override {
//EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_tablet->tablet_path()).ok());
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_absolute_dir).ok());
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(tmp_dir).ok());
_engine_ref = nullptr;
ExecEnv::GetInstance()->set_storage_engine(nullptr);
}
public:
SchemaUtilRowsetTest() = default;
virtual ~SchemaUtilRowsetTest() = default;
private:
StorageEngine* _engine_ref = nullptr;
std::unique_ptr<DataDir> _data_dir = nullptr;
TabletSharedPtr _tablet = nullptr;
std::string _absolute_dir;
std::string _curreent_dir;
};
static void construct_column(ColumnPB* column_pb, int32_t col_unique_id,
const std::string& column_type, const std::string& column_name,
bool is_key = false, bool add_children = false) {
column_pb->set_unique_id(col_unique_id);
column_pb->set_name(column_name);
column_pb->set_type(column_type);
column_pb->set_is_key(is_key);
column_pb->set_is_nullable(false);
if (column_type == "VARIANT") {
column_pb->set_variant_max_subcolumns_count(3);
if (add_children) {
ColumnPB* child = column_pb->add_children_columns();
child->set_name("key0");
child->set_type("STRING");
}
}
}
// static void construct_tablet_index(TabletIndexPB* tablet_index, int64_t index_id, const std::string& index_name, int32_t col_unique_id) {
// tablet_index->set_index_id(index_id);
// tablet_index->set_index_name(index_name);
// tablet_index->set_index_type(IndexType::INVERTED);
// tablet_index->add_col_unique_id(col_unique_id);
// }
static std::unordered_map<int32_t, variant_util::PathToNoneNullValues> all_path_stats;
static void fill_string_column_with_test_data(auto& column_string, int size, int uid) {
std::srand(42);
for (int i = 0; i < size; i++) {
std::string json_str = "{";
int num_pairs = std::rand() % 10 + 1;
for (int j = 0; j < num_pairs; j++) {
std::string key = "key" + std::to_string(j);
if (j % 2 == 0) {
int value = std::rand() % 100;
json_str += "\"" + key + "\" : " + std::to_string(value);
} else {
std::string value = "str" + std::to_string(std::rand() % 100);
json_str += "\"" + key + "\" : \"" + value + "\"";
}
if (j < num_pairs - 1) {
json_str += ", ";
}
all_path_stats[uid][key] += 1;
}
json_str += "}";
// Field str(json_str);
column_string->insert_data(json_str.data(), json_str.size());
}
}
static void fill_varaint_column(auto& variant_column, int size, int uid) {
auto type_string = std::make_shared<DataTypeString>();
auto column = type_string->create_column();
auto column_string = assert_cast<ColumnString*>(column.get());
fill_string_column_with_test_data(column_string, size, uid);
ParseConfig config;
config.deprecated_enable_flatten_nested = false;
variant_util::parse_json_to_variant(*variant_column, *column_string, config);
}
static void fill_block_with_test_data(Block* block, int size) {
auto columns = block->mutate_columns();
// insert key
for (int i = 0; i < size; i++) {
auto field = Field::create_field<PrimitiveType::TYPE_INT>(i);
columns[0]->insert(field);
}
// insert v1
fill_varaint_column(columns[1], size, 1);
// insert v2
for (int i = 0; i < size; i++) {
auto v2 = Field::create_field<PrimitiveType::TYPE_STRING>("V2");
columns[2]->insert(v2);
}
// insert v3
fill_varaint_column(columns[3], size, 3);
// insert v4
for (int i = 0; i < size; i++) {
auto v4 = Field::create_field<PrimitiveType::TYPE_INT>(i);
columns[4]->insert(v4);
}
}
static int64_t inc_id = 1000;
static RowsetWriterContext rowset_writer_context(const std::unique_ptr<DataDir>& data_dir,
const TabletSchemaSPtr& schema,
const std::string& tablet_path,
const TabletSharedPtr& tablet) {
RowsetWriterContext context;
RowsetId rowset_id;
rowset_id.init(inc_id);
context.rowset_id = rowset_id;
context.rowset_type = BETA_ROWSET;
context.data_dir = data_dir.get();
context.rowset_state = VISIBLE;
context.tablet_schema = schema;
context.tablet_path = tablet_path;
context.tablet_id = tablet->tablet_id();
context.tablet = tablet;
context.version = Version(inc_id, inc_id);
context.max_rows_per_segment = 200;
inc_id++;
return context;
}
static RowsetSharedPtr create_rowset(auto& rowset_writer, const TabletSchemaSPtr& tablet_schema) {
Block block = tablet_schema->create_block();
fill_block_with_test_data(&block, 1000);
auto st = rowset_writer->add_block(&block);
EXPECT_TRUE(st.ok()) << st.msg();
st = rowset_writer->flush();
EXPECT_TRUE(st.ok()) << st.msg();
RowsetSharedPtr rowset;
EXPECT_TRUE(rowset_writer->build(rowset).ok());
EXPECT_TRUE(rowset->num_segments() == 5);
return rowset;
}
TEST_F(SchemaUtilRowsetTest, check_path_stats_agg_key) {
// 1.create tablet schema
TabletSchemaPB schema_pb;
schema_pb.set_keys_type(AGG_KEYS);
construct_column(schema_pb.add_column(), 0, "INT", "key", true);
construct_column(schema_pb.add_column(), 1, "VARIANT", "v1");
construct_column(schema_pb.add_column(), 2, "STRING", "v2");
construct_column(schema_pb.add_column(), 3, "VARIANT", "v3");
construct_column(schema_pb.add_column(), 4, "INT", "v4");
TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>();
tablet_schema->init_from_pb(schema_pb);
// 2. create tablet
TabletMetaSharedPtr tablet_meta(new TabletMeta(tablet_schema));
// set seed
std::srand(42);
bool external_segment_meta_used_default = rand() % 2 == 0;
std::cout << "external_segment_meta_used_default: " << external_segment_meta_used_default
<< std::endl;
tablet_schema->set_external_segment_meta_used_default(external_segment_meta_used_default);
std::string absolute_dir = _curreent_dir + std::string("/ut_dir/schema_util_rows");
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(absolute_dir).ok());
EXPECT_TRUE(io::global_local_filesystem()->create_directory(absolute_dir).ok());
std::unique_ptr<DataDir> _data_dir = std::make_unique<DataDir>(*_engine_ref, absolute_dir);
static_cast<void>(_data_dir->update_capacity());
EXPECT_TRUE(_data_dir->init(true).ok());
TabletSharedPtr _tablet = std::make_shared<Tablet>(*_engine_ref, tablet_meta, _data_dir.get());
EXPECT_TRUE(_tablet->init().ok());
EXPECT_TRUE(io::global_local_filesystem()->create_directory(_tablet->tablet_path()).ok());
// 3. create rowset
std::vector<RowsetSharedPtr> rowsets;
for (int i = 0; i < 5; i++) {
const auto& res = RowsetFactory::create_rowset_writer(
*_engine_ref,
rowset_writer_context(_data_dir, tablet_schema, _tablet->tablet_path(), _tablet),
false);
EXPECT_TRUE(res.has_value()) << res.error();
const auto& rowset_writer = res.value();
auto rowset = create_rowset(rowset_writer, tablet_schema);
EXPECT_TRUE(_tablet->add_rowset(rowset).ok());
rowsets.push_back(rowset);
}
// 7. check output rowset
EXPECT_TRUE(variant_util::VariantCompactionUtil::check_path_stats(rowsets, rowsets[0], _tablet)
.ok());
}
TEST_F(SchemaUtilRowsetTest, check_path_stats_agg_delete) {
// 1.create tablet schema
TabletSchemaPB schema_pb;
schema_pb.set_delete_sign_idx(0);
construct_column(schema_pb.add_column(), 0, "INT", "key", true);
construct_column(schema_pb.add_column(), 1, "VARIANT", "v1");
construct_column(schema_pb.add_column(), 2, "STRING", "v2");
construct_column(schema_pb.add_column(), 3, "VARIANT", "v3");
construct_column(schema_pb.add_column(), 4, "INT", "v4");
TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>();
tablet_schema->init_from_pb(schema_pb);
// 2. create tablet
TabletMetaSharedPtr tablet_meta(new TabletMeta(tablet_schema));
// set seed
std::srand(42);
bool external_segment_meta_used_default = rand() % 2 == 0;
std::cout << "external_segment_meta_used_default: " << external_segment_meta_used_default
<< std::endl;
tablet_schema->set_external_segment_meta_used_default(external_segment_meta_used_default);
std::string absolute_dir = _curreent_dir + std::string("/ut_dir/schema_util_rows1");
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(absolute_dir).ok());
EXPECT_TRUE(io::global_local_filesystem()->create_directory(absolute_dir).ok());
std::unique_ptr<DataDir> _data_dir = std::make_unique<DataDir>(*_engine_ref, absolute_dir);
static_cast<void>(_data_dir->update_capacity());
EXPECT_TRUE(_data_dir->init(true).ok());
TabletSharedPtr _tablet = std::make_shared<Tablet>(*_engine_ref, tablet_meta, _data_dir.get());
EXPECT_TRUE(_tablet->init().ok());
EXPECT_TRUE(io::global_local_filesystem()->create_directory(_tablet->tablet_path()).ok());
// 3. create rowset
std::vector<RowsetSharedPtr> rowsets;
for (int i = 0; i < 5; i++) {
const auto& res = RowsetFactory::create_rowset_writer(
*_engine_ref,
rowset_writer_context(_data_dir, tablet_schema, _tablet->tablet_path(), _tablet),
false);
EXPECT_TRUE(res.has_value()) << res.error();
const auto& rowset_writer = res.value();
auto rowset = create_rowset(rowset_writer, tablet_schema);
EXPECT_TRUE(_tablet->add_rowset(rowset).ok());
rowsets.push_back(rowset);
}
// 7. check output rowset
Status st = variant_util::VariantCompactionUtil::check_path_stats(rowsets, rowsets[0], _tablet);
std::cout << st.to_string() << std::endl;
EXPECT_FALSE(st.ok());
}
// Mixed-format compatibility: one tablet contains both old (inline meta) and new
// (external column meta + variant ext meta) segments, and path stats / scanning
// logic should work across all rowsets.
TEST_F(SchemaUtilRowsetTest, mixed_external_segment_meta_old_new) {
// 1. create tablet schema (same layout as check_path_stats_agg_key)
TabletSchemaPB schema_pb;
schema_pb.set_keys_type(AGG_KEYS);
construct_column(schema_pb.add_column(), 0, "INT", "key", true);
construct_column(schema_pb.add_column(), 1, "VARIANT", "v1");
construct_column(schema_pb.add_column(), 2, "STRING", "v2");
construct_column(schema_pb.add_column(), 3, "VARIANT", "v3");
construct_column(schema_pb.add_column(), 4, "INT", "v4");
TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>();
tablet_schema->init_from_pb(schema_pb);
// 2. create tablet and data dir
TabletMetaSharedPtr tablet_meta(new TabletMeta(tablet_schema));
// First write a few rowsets with external_segment_meta_used_default = false (old format)
tablet_schema->set_external_segment_meta_used_default(false);
std::string absolute_dir = _curreent_dir + std::string("/ut_dir/schema_util_rows_mixed");
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(absolute_dir).ok());
EXPECT_TRUE(io::global_local_filesystem()->create_directory(absolute_dir).ok());
std::unique_ptr<DataDir> data_dir = std::make_unique<DataDir>(*_engine_ref, absolute_dir);
static_cast<void>(data_dir->update_capacity());
EXPECT_TRUE(data_dir->init(true).ok());
TabletSharedPtr tablet = std::make_shared<Tablet>(*_engine_ref, tablet_meta, data_dir.get());
EXPECT_TRUE(tablet->init().ok());
EXPECT_TRUE(io::global_local_filesystem()->create_directory(tablet->tablet_path()).ok());
// 3. create multiple rowsets: first batch uses old footer format, second batch uses new format
std::vector<RowsetSharedPtr> rowsets;
auto create_and_add_rowset = [&](int /*idx*/) {
const auto& res = RowsetFactory::create_rowset_writer(
*_engine_ref,
rowset_writer_context(data_dir, tablet_schema, tablet->tablet_path(), tablet),
false);
EXPECT_TRUE(res.has_value()) << res.error();
const auto& rowset_writer = res.value();
auto rowset = create_rowset(rowset_writer, tablet_schema);
EXPECT_TRUE(tablet->add_rowset(rowset).ok());
rowsets.push_back(rowset);
};
// 3.1 write a few old-format rowsets (inline meta only)
for (int i = 0; i < 3; ++i) {
create_and_add_rowset(i);
}
// 3.2 flip tablet default to enable external column meta for subsequent segments
tablet_schema->set_external_segment_meta_used_default(true);
// 3.3 write a few new-format rowsets (external meta enabled)
for (int i = 0; i < 3; ++i) {
create_and_add_rowset(i + 3);
}
EXPECT_EQ(rowsets.size(), 6);
// 4. check that VariantCompactionUtil::check_path_stats works across mixed segments
// This will internally create Segment / ColumnReader instances and should be
// insensitive to whether a particular segment uses inline or external meta.
EXPECT_TRUE(variant_util::VariantCompactionUtil::check_path_stats(rowsets, rowsets[0], tablet)
.ok());
}
TEST_F(SchemaUtilRowsetTest, collect_path_stats_and_get_extended_compaction_schema) {
all_path_stats.clear();
// 1.create tablet schema
TabletSchemaPB schema_pb;
construct_column(schema_pb.add_column(), 0, "INT", "key", true);
construct_column(schema_pb.add_column(), 1, "VARIANT", "v1");
construct_column(schema_pb.add_column(), 2, "STRING", "v2");
construct_column(schema_pb.add_column(), 3, "VARIANT", "v3");
construct_column(schema_pb.add_column(), 4, "INT", "v4");
TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>();
tablet_schema->init_from_pb(schema_pb);
// 2. create tablet
TabletMetaSharedPtr tablet_meta(new TabletMeta(tablet_schema));
bool external_segment_meta_used_default = rand() % 2 == 0;
std::cout << "external_segment_meta_used_default: " << external_segment_meta_used_default
<< std::endl;
tablet_schema->set_external_segment_meta_used_default(external_segment_meta_used_default);
tablet_meta->_tablet_id = 12345;
_tablet = std::make_shared<Tablet>(*_engine_ref, tablet_meta, _data_dir.get());
EXPECT_TRUE(_tablet->init().ok());
EXPECT_TRUE(io::global_local_filesystem()->create_directory(_tablet->tablet_path()).ok());
// 3. create rowset
std::vector<RowsetSharedPtr> rowsets;
for (int i = 0; i < 5; i++) {
const auto& res = RowsetFactory::create_rowset_writer(
*_engine_ref,
rowset_writer_context(_data_dir, tablet_schema, _tablet->tablet_path(), _tablet),
false);
EXPECT_TRUE(res.has_value()) << res.error();
const auto& rowset_writer = res.value();
auto rowset = create_rowset(rowset_writer, tablet_schema);
EXPECT_TRUE(_tablet->add_rowset(rowset).ok());
rowsets.push_back(rowset);
}
// 4. get compaction schema
TabletSchemaSPtr compaction_schema = tablet_schema;
for (const auto& column : compaction_schema->columns()) {
if (column->is_extracted_column()) {
EXPECT_FALSE(column->is_variant_type());
}
}
// 5. check compaction schema
std::unordered_map<int32_t, std::vector<std::string>> compaction_schema_map;
for (const auto& column : compaction_schema->columns()) {
if (column->parent_unique_id() > 0) {
compaction_schema_map[column->parent_unique_id()].push_back(column->name());
}
}
for (auto& [uid, paths] : compaction_schema_map) {
EXPECT_EQ(paths.size(), 4);
std::sort(paths.begin(), paths.end());
EXPECT_TRUE(paths[0].ends_with("__DORIS_VARIANT_SPARSE__"));
EXPECT_TRUE(paths[1].ends_with("key0"));
EXPECT_TRUE(paths[2].ends_with("key1"));
EXPECT_TRUE(paths[3].ends_with("key2"));
}
// 6.compaction for output rs
// create input rowset reader
std::vector<RowsetReaderSharedPtr> input_rs_readers;
for (auto& rowset : rowsets) {
RowsetReaderSharedPtr rs_reader;
ASSERT_TRUE(rowset->create_reader(&rs_reader).ok());
input_rs_readers.push_back(std::move(rs_reader));
}
// create output rowset writer
auto create_rowset_writer_context = [this](TabletSchemaSPtr tablet_schema,
const SegmentsOverlapPB& overlap,
uint32_t max_rows_per_segment, Version version) {
static int64_t inc_id = 12345;
RowsetWriterContext rowset_writer_context;
RowsetId rowset_id;
rowset_id.init(inc_id);
rowset_writer_context.rowset_id = rowset_id;
rowset_writer_context.rowset_type = BETA_ROWSET;
rowset_writer_context.rowset_state = VISIBLE;
rowset_writer_context.tablet_schema = tablet_schema;
rowset_writer_context.tablet_path = _tablet->tablet_path();
rowset_writer_context.data_dir = _data_dir.get();
rowset_writer_context.tablet_id = _tablet->tablet_id();
rowset_writer_context.tablet = _tablet;
rowset_writer_context.version = version;
rowset_writer_context.segments_overlap = overlap;
rowset_writer_context.max_rows_per_segment = max_rows_per_segment;
rowset_writer_context.write_type = DataWriteType::TYPE_COMPACTION;
inc_id++;
return rowset_writer_context;
};
auto writer_context = create_rowset_writer_context(tablet_schema, NONOVERLAPPING, 3456,
{0, rowsets.back()->end_version()});
auto res_ = RowsetFactory::create_rowset_writer(*_engine_ref, writer_context, true);
ASSERT_TRUE(res_.has_value()) << res_.error();
auto output_rs_writer = std::move(res_).value();
Merger::Statistics stats;
RowIdConversion rowid_conversion;
stats.rowid_conversion = &rowid_conversion;
auto s = Merger::vertical_merge_rowsets(_tablet, ReaderType::READER_BASE_COMPACTION,
*tablet_schema, input_rs_readers,
output_rs_writer.get(), 100, 5, &stats);
ASSERT_TRUE(s.ok()) << s;
RowsetSharedPtr out_rowset;
EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset));
ASSERT_TRUE(out_rowset);
// check no variant subcolumns in output rowset
for (const auto& column : out_rowset->tablet_schema()->columns()) {
EXPECT_FALSE(column->is_extracted_column());
}
// 7. check output rowset
EXPECT_TRUE(variant_util::VariantCompactionUtil::check_path_stats(rowsets, out_rowset, _tablet)
.ok());
// get_data_type_of check
auto file_path =
local_segment_path(_tablet->tablet_path(), out_rowset->rowset_id().to_string(), 0);
OlapReaderStatistics olap_reader_stats;
std::shared_ptr<Segment> segment;
auto st = Segment::open(io::global_local_filesystem(), file_path, _tablet->tablet_id(), 0,
out_rowset->rowset_id(), out_rowset->tablet_schema(),
io::FileReaderOptions(), &segment, InvertedIndexFileInfo(),
&olap_reader_stats);
EXPECT_TRUE(st.ok()) << st.msg();
TabletColumn subcolumn_in_sparse;
subcolumn_in_sparse.set_name("v1.key3");
subcolumn_in_sparse.set_type(FieldType::OLAP_FIELD_TYPE_STRING);
subcolumn_in_sparse.set_unique_id(-1);
subcolumn_in_sparse.set_parent_unique_id(1);
subcolumn_in_sparse.set_path_info(PathInData("v1.key3"));
subcolumn_in_sparse.set_variant_max_subcolumns_count(3);
subcolumn_in_sparse.set_is_nullable(true);
st = segment->_create_column_meta_once(&olap_reader_stats);
EXPECT_TRUE(st.ok()) << st.msg();
// key3 is in the sparse column, return variant type
StorageReadOptions type_opts;
type_opts.tablet_schema = out_rowset->tablet_schema();
type_opts.io_ctx.reader_type = ReaderType::READER_QUERY;
auto data_type = segment->get_data_type_of(subcolumn_in_sparse, type_opts);
EXPECT_TRUE(data_type != nullptr);
EXPECT_TRUE(data_type->get_storage_field_type() == FieldType::OLAP_FIELD_TYPE_VARIANT);
subcolumn_in_sparse.set_name("v1.keya");
subcolumn_in_sparse.set_type(FieldType::OLAP_FIELD_TYPE_STRING);
subcolumn_in_sparse.set_path_info(PathInData("v1.keya"));
// keya is not in the segment, return string type;
data_type = segment->get_data_type_of(subcolumn_in_sparse, type_opts);
EXPECT_TRUE(data_type != nullptr);
EXPECT_TRUE(data_type->get_storage_field_type() == FieldType::OLAP_FIELD_TYPE_STRING);
// path contains sparse marker, should return default column type (OLAP_FIELD_TYPE_MAP)
TabletColumn sparse_typed_col;
sparse_typed_col.set_name("v1.__DORIS_VARIANT_SPARSE__");
sparse_typed_col.set_type(FieldType::OLAP_FIELD_TYPE_MAP);
sparse_typed_col.set_unique_id(-1);
sparse_typed_col.set_parent_unique_id(1);
sparse_typed_col.set_path_info(PathInData(std::string("v1.") + SPARSE_COLUMN_PATH));
sparse_typed_col.set_variant_max_subcolumns_count(3);
sparse_typed_col.set_is_nullable(true);
// add key/value subcolumns for MAP to satisfy DataTypeFactory checks
// key: STRING, value: VARIANT
{
TabletColumn key_col;
key_col.set_name("__key");
key_col.set_type(FieldType::OLAP_FIELD_TYPE_STRING);
key_col.set_is_nullable(false);
sparse_typed_col.add_sub_column(key_col);
TabletColumn value_col;
value_col.set_name("__value");
value_col.set_type(FieldType::OLAP_FIELD_TYPE_INT);
value_col.set_is_nullable(true);
sparse_typed_col.add_sub_column(value_col);
}
data_type = segment->get_data_type_of(sparse_typed_col, type_opts);
EXPECT_TRUE(data_type != nullptr);
EXPECT_TRUE(data_type->get_storage_field_type() == FieldType::OLAP_FIELD_TYPE_MAP);
subcolumn_in_sparse.set_name("v1.keyb");
subcolumn_in_sparse.set_type(FieldType::OLAP_FIELD_TYPE_INT);
subcolumn_in_sparse.set_path_info(PathInData("v1.keyb"));
subcolumn_in_sparse._column_path->has_nested = true;
// keyb has nested part, return int type;
data_type = segment->get_data_type_of(subcolumn_in_sparse, type_opts);
EXPECT_TRUE(data_type != nullptr);
EXPECT_TRUE(data_type->get_storage_field_type() == FieldType::OLAP_FIELD_TYPE_INT);
// key1 is in the subcolumns, return string type;
subcolumn_in_sparse.set_name("v1.key1");
subcolumn_in_sparse.set_type(FieldType::OLAP_FIELD_TYPE_STRING);
subcolumn_in_sparse.set_path_info(PathInData("v1.key1"));
data_type = segment->get_data_type_of(subcolumn_in_sparse, type_opts);
EXPECT_TRUE(data_type != nullptr);
EXPECT_TRUE(data_type->get_storage_field_type() == FieldType::OLAP_FIELD_TYPE_STRING);
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_tablet->tablet_path()).ok());
}
TabletSchemaSPtr create_compaction_schema_common(StorageEngine* _engine_ref,
std::string _absolute_dir) {
all_path_stats.clear();
// 1.create tablet schema
TabletSchemaPB schema_pb;
construct_column(schema_pb.add_column(), 0, "INT", "key", true);
construct_column(schema_pb.add_column(), 1, "VARIANT", "v1", false, true);
construct_column(schema_pb.add_column(), 2, "STRING", "v2");
construct_column(schema_pb.add_column(), 3, "VARIANT", "v3", false, true);
construct_column(schema_pb.add_column(), 4, "INT", "v4");
TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>();
tablet_schema->init_from_pb(schema_pb);
// 2. create tablet
TabletMetaSharedPtr tablet_meta(new TabletMeta(tablet_schema));
bool external_segment_meta_used_default = rand() % 2 == 0;
std::cout << "external_segment_meta_used_default: " << external_segment_meta_used_default
<< std::endl;
tablet_schema->set_external_segment_meta_used_default(external_segment_meta_used_default);
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_absolute_dir).ok());
EXPECT_TRUE(io::global_local_filesystem()->create_directory(_absolute_dir).ok());
std::unique_ptr<DataDir> _data_dir = std::make_unique<DataDir>(*_engine_ref, _absolute_dir);
static_cast<void>(_data_dir->update_capacity());
Status st1 = _data_dir->init(true);
EXPECT_TRUE(st1.ok()) << st1.msg();
std::shared_ptr<Tablet> _tablet =
std::make_shared<Tablet>(*_engine_ref, tablet_meta, _data_dir.get());
EXPECT_TRUE(_tablet->init().ok());
EXPECT_TRUE(io::global_local_filesystem()->create_directory(_tablet->tablet_path()).ok());
// 3. create rowset
std::vector<RowsetSharedPtr> rowsets;
for (int i = 0; i < 1; i++) {
const auto& res = RowsetFactory::create_rowset_writer(
*_engine_ref,
rowset_writer_context(_data_dir, tablet_schema, _tablet->tablet_path(), _tablet),
false);
EXPECT_TRUE(res.has_value()) << res.error();
const auto& rowset_writer = res.value();
auto rowset = create_rowset(rowset_writer, tablet_schema);
EXPECT_TRUE(_tablet->add_rowset(rowset).ok());
rowsets.push_back(rowset);
}
std::unordered_map<int32_t, variant_util::PathToNoneNullValues> path_stats;
for (const auto& rowset : rowsets) {
auto st = variant_util::VariantCompactionUtil::aggregate_path_to_stats(rowset, &path_stats);
EXPECT_TRUE(st.ok()) << st.msg();
}
for (const auto& [uid, path_stats] : path_stats) {
for (const auto& [path, size] : path_stats) {
EXPECT_EQ(all_path_stats[uid][path], size);
}
}
// 4. get compaction schema
TabletSchemaSPtr compaction_schema = tablet_schema;
auto st = variant_util::VariantCompactionUtil::get_extended_compaction_schema(
rowsets, compaction_schema);
EXPECT_TRUE(st.ok()) << st.msg();
// 5. check compaction schema
std::unordered_map<int32_t, std::vector<std::string>> compaction_schema_map;
for (const auto& column : compaction_schema->columns()) {
if (column->parent_unique_id() > 0) {
compaction_schema_map[column->parent_unique_id()].push_back(column->name());
}
}
for (auto& [uid, paths] : compaction_schema_map) {
std::sort(paths.begin(), paths.end());
std::cout << "path[0]: " << paths[0] << std::endl;
std::cout << "path[1]: " << paths[1] << std::endl;
std::cout << "path[2]: " << paths[2] << std::endl;
std::cout << "path[3]: " << paths[3] << std::endl;
std::cout << "path[4]: " << paths[4] << std::endl;
EXPECT_EQ(paths.size(), 5);
EXPECT_TRUE(paths[0].ends_with("__DORIS_VARIANT_SPARSE__"));
EXPECT_TRUE(paths[2].ends_with("key1"));
EXPECT_TRUE(paths[3].ends_with("key2"));
EXPECT_TRUE(paths[4].ends_with("key3"));
}
return compaction_schema;
}
TEST_F(SchemaUtilRowsetTest, some_test_for_subcolumn_writer) {
std::string absolute_dir = _curreent_dir + std::string("/ut_dir/schema_util_rows2");
TabletSchemaSPtr compaction_schema = create_compaction_schema_common(_engine_ref, absolute_dir);
// 6. create variantSubColumnWriter
// 6.1. Create file writer
io::FileWriterPtr file_writer;
std::string new_tablet_path = absolute_dir + "/tmp_data/";
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(new_tablet_path).ok());
EXPECT_TRUE(io::global_local_filesystem()->create_directory(new_tablet_path).ok());
auto file_path = local_segment_path(new_tablet_path, "0", 0);
auto st1 = io::global_local_filesystem()->create_file(file_path, &file_writer);
EXPECT_TRUE(st1.ok()) << st1.msg();
SegmentFooterPB footer;
ColumnWriterOptions opts;
opts.meta = footer.add_columns();
opts.compression_type = CompressionTypePB::LZ4;
opts.file_writer = file_writer.get();
opts.footer = &footer;
RowsetWriterContext rowset_ctx;
rowset_ctx.write_type = DataWriteType::TYPE_COMPACTION;
opts.rowset_ctx = &rowset_ctx;
opts.rowset_ctx->tablet_schema = compaction_schema;
// create sub column with pathinfo
std::cout << compaction_schema->dump_structure() << std::endl;
// this is v1.key1
TabletColumn column = compaction_schema->column(2);
_init_column_meta(opts.meta, 0, column, CompressionTypePB::LZ4);
std::unique_ptr<ColumnWriter> writer;
EXPECT_TRUE(
ColumnWriter::create_variant_writer(opts, &column, file_writer.get(), &writer).ok());
EXPECT_TRUE(writer->init().ok());
EXPECT_TRUE(assert_cast<VariantSubcolumnWriter*>(writer.get()) != nullptr);
auto variant_subcolumn_writer = assert_cast<VariantSubcolumnWriter*>(writer.get());
// then we can do some thing for sub_writer
// estimate buffer size
auto size = variant_subcolumn_writer->estimate_buffer_size();
std::cout << "size: " << size << std::endl;
// append data
auto insert_object = ColumnVariant::create(true);
fill_varaint_column(insert_object, 1, 1);
std::cout << insert_object->debug_string() << std::endl;
std::unique_ptr<VariantColumnData> _variant_column_data = std::make_unique<VariantColumnData>();
_variant_column_data->column_data = insert_object.get();
_variant_column_data->row_pos = 0;
const uint8_t* data = (const uint8_t*)_variant_column_data.get();
EXPECT_TRUE(variant_subcolumn_writer->append_data(&data, 1));
// write null data
EXPECT_TRUE(variant_subcolumn_writer->write_data().ok());
}
TEST_F(SchemaUtilRowsetTest, typed_path_to_sparse_column) {
all_path_stats.clear();
// 1.create tablet schema
TabletSchemaPB schema_pb;
construct_column(schema_pb.add_column(), 0, "INT", "key", true);
construct_column(schema_pb.add_column(), 1, "VARIANT", "v1");
construct_column(schema_pb.add_column(), 2, "STRING", "v2");
construct_column(schema_pb.add_column(), 3, "VARIANT", "v3");
construct_column(schema_pb.add_column(), 4, "INT", "v4");
TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>();
tablet_schema->init_from_pb(schema_pb);
// 2. create tablet
TabletMetaSharedPtr tablet_meta(new TabletMeta(tablet_schema));
bool external_segment_meta_used_default = rand() % 2 == 0;
std::cout << "external_segment_meta_used_default: " << external_segment_meta_used_default
<< std::endl;
tablet_schema->set_external_segment_meta_used_default(external_segment_meta_used_default);
_tablet = std::make_shared<Tablet>(*_engine_ref, tablet_meta, _data_dir.get());
EXPECT_TRUE(_tablet->init().ok());
EXPECT_TRUE(io::global_local_filesystem()->create_directory(_tablet->tablet_path()).ok());
// 3. create rowset
std::vector<RowsetSharedPtr> rowsets;
for (int i = 0; i < 5; i++) {
const auto& res = RowsetFactory::create_rowset_writer(
*_engine_ref,
rowset_writer_context(_data_dir, tablet_schema, _tablet->tablet_path(), _tablet),
false);
EXPECT_TRUE(res.has_value()) << res.error();
const auto& rowset_writer = res.value();
auto rowset = create_rowset(rowset_writer, tablet_schema);
EXPECT_TRUE(_tablet->add_rowset(rowset).ok());
rowsets.push_back(rowset);
}
// 4. get compaction schema
TabletSchemaSPtr compaction_schema = tablet_schema;
auto st = variant_util::VariantCompactionUtil::get_extended_compaction_schema(
rowsets, compaction_schema);
EXPECT_TRUE(st.ok()) << st.msg();
for (const auto& column : compaction_schema->columns()) {
if (column->is_extracted_column()) {
EXPECT_FALSE(column->is_variant_type());
}
}
// 5. check compaction schema
std::unordered_map<int32_t, std::vector<std::string>> compaction_schema_map;
for (const auto& column : compaction_schema->columns()) {
if (column->parent_unique_id() > 0) {
compaction_schema_map[column->parent_unique_id()].push_back(column->name());
}
}
for (auto& [uid, paths] : compaction_schema_map) {
EXPECT_EQ(paths.size(), 4);
std::sort(paths.begin(), paths.end());
EXPECT_TRUE(paths[0].ends_with("__DORIS_VARIANT_SPARSE__"));
EXPECT_TRUE(paths[1].ends_with("key0"));
EXPECT_TRUE(paths[2].ends_with("key1"));
EXPECT_TRUE(paths[3].ends_with("key2"));
}
// 6.compaction for output rs
// create input rowset reader
std::vector<RowsetReaderSharedPtr> input_rs_readers;
for (auto& rowset : rowsets) {
RowsetReaderSharedPtr rs_reader;
ASSERT_TRUE(rowset->create_reader(&rs_reader).ok());
input_rs_readers.push_back(std::move(rs_reader));
}
// create output rowset writer
auto create_rowset_writer_context = [this](TabletSchemaSPtr tablet_schema,
const SegmentsOverlapPB& overlap,
uint32_t max_rows_per_segment, Version version) {
static int64_t inc_id = 1000;
RowsetWriterContext rowset_writer_context;
RowsetId rowset_id;
rowset_id.init(inc_id);
rowset_writer_context.rowset_id = rowset_id;
rowset_writer_context.rowset_type = BETA_ROWSET;
rowset_writer_context.rowset_state = VISIBLE;
rowset_writer_context.tablet_schema = tablet_schema;
rowset_writer_context.tablet_path = _absolute_dir + "/../";
rowset_writer_context.data_dir = _data_dir.get();
rowset_writer_context.tablet_id = _tablet->tablet_id();
rowset_writer_context.tablet = _tablet;
rowset_writer_context.version = version;
rowset_writer_context.segments_overlap = overlap;
rowset_writer_context.max_rows_per_segment = max_rows_per_segment;
rowset_writer_context.write_type = DataWriteType::TYPE_COMPACTION;
inc_id++;
return rowset_writer_context;
};
auto writer_context = create_rowset_writer_context(tablet_schema, NONOVERLAPPING, 3456,
{0, rowsets.back()->end_version()});
auto res_ = RowsetFactory::create_rowset_writer(*_engine_ref, writer_context, true);
ASSERT_TRUE(res_.has_value()) << res_.error();
auto output_rs_writer = std::move(res_).value();
Merger::Statistics stats;
RowIdConversion rowid_conversion;
stats.rowid_conversion = &rowid_conversion;
auto s = Merger::vertical_merge_rowsets(_tablet, ReaderType::READER_BASE_COMPACTION,
*tablet_schema, input_rs_readers,
output_rs_writer.get(), 100, 5, &stats);
ASSERT_TRUE(s.ok()) << s;
RowsetSharedPtr out_rowset;
EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset));
ASSERT_TRUE(out_rowset);
// check no variant subcolumns in output rowset
for (const auto& column : out_rowset->tablet_schema()->columns()) {
EXPECT_FALSE(column->is_extracted_column());
}
// 7. check output rowset
EXPECT_TRUE(variant_util::VariantCompactionUtil::check_path_stats(rowsets, out_rowset, _tablet)
.ok());
}