blob: d1458c6258b3d1b4a2df1951178ebdabfef51c26 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <incbin.h>
#include <testConfig.h>
#include <Core/Settings.h>
#include <DataTypes/DataTypeNullable.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Parser/LocalExecutor.h>
#include <Parser/RelParsers/WriteRelParser.h>
#include <Parser/SubstraitParserUtils.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ParserQuery.h>
#include <Parsers/parseQuery.h>
#include <Processors/Transforms/DeduplicationTokenTransforms.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeSink.h>
#include <Storages/MergeTree/SparkMergeTreeMeta.h>
#include <Storages/MergeTree/SparkMergeTreeWriteSettings.h>
#include <Storages/MergeTree/SparkMergeTreeWriter.h>
#include <Storages/StorageMergeTree.h>
#include <gtest/gtest.h>
#include <substrait/algebra.pb.h>
#include <tests/utils/gluten_test_util.h>
#include <Common/BlockTypeUtils.h>
#include <Common/DebugUtils.h>
#include <Common/QueryContext.h>
#include <Common/ThreadStatus.h>
namespace DB::Setting
{
extern const SettingsUInt64 max_parser_depth;
extern const SettingsUInt64 max_parser_backtracks;
extern const SettingsBool allow_settings_after_format_in_insert;
extern const SettingsUInt64 max_query_size;
extern const SettingsUInt64 min_insert_block_size_rows;
extern const SettingsUInt64 min_insert_block_size_bytes;
}
using namespace local_engine;
using namespace DB;
namespace
{
void do_remove(const std::string & folder)
{
namespace fs = std::filesystem;
if (const std::filesystem::path ph(folder); fs::exists(ph))
fs::remove_all(ph);
}
Chunk person_chunk()
{
auto id = INT()->createColumn();
id->insert(100);
id->insert(200);
id->insert(300);
id->insert(400);
id->insert(500);
id->insert(600);
id->insert(700);
auto name = STRING()->createColumn();
name->insert("Joe");
name->insert("Marry");
name->insert("Mike");
name->insert("Fred");
name->insert("Albert");
name->insert("Michelle");
name->insert("Dan");
auto age = makeNullable(INT())->createColumn();
Field null_field;
age->insert(30);
age->insert(null_field);
age->insert(18);
age->insert(50);
age->insert(null_field);
age->insert(30);
age->insert(50);
MutableColumns x;
x.push_back(std::move(id));
x.push_back(std::move(name));
x.push_back(std::move(age));
return {std::move(x), 7};
}
}
TEST(MergeTree, ClickhouseMergeTree)
{
ThreadStatus thread_status;
const auto context = DB::Context::createCopy(QueryContext::globalContext());
const Settings & settings = context->getSettingsRef();
const std::string query
= R"(create table if not exists person (id Int32, Name String, Age Nullable(Int32)) engine = MergeTree() ORDER BY id)";
const char * begin = query.data();
const char * end = query.data() + query.size();
ParserQuery parser(end, settings[Setting::allow_settings_after_format_in_insert]);
ASTPtr ast = parseQuery(
parser,
begin,
end,
"",
settings[Setting::max_query_size],
settings[Setting::max_parser_depth],
settings[Setting::max_parser_backtracks]);
EXPECT_TRUE(ast->as<ASTCreateQuery>());
auto & create = ast->as<ASTCreateQuery &>();
ColumnsDescription column_descriptions
= InterpreterCreateQuery::getColumnsDescription(*create.columns_list->columns, context, LoadingStrictnessLevel::CREATE);
StorageInMemoryMetadata metadata;
metadata.setColumns(column_descriptions);
metadata.setComment("args.comment");
ASTPtr partition_by_key;
metadata.partition_key = KeyDescription::getKeyFromAST(partition_by_key, metadata.columns, context);
MergeTreeData::MergingParams merging_params;
merging_params.mode = MergeTreeData::MergingParams::Ordinary;
/// This merging param maybe used as part of sorting key
std::optional<String> merging_param_key_arg;
/// Get sorting key from engine arguments.
///
/// NOTE: store merging_param_key_arg as additional key column. We do it
/// before storage creation. After that storage will just copy this
/// column if sorting key will be changed.
metadata.sorting_key
= KeyDescription::getSortingKeyFromAST(create.storage->order_by->ptr(), metadata.columns, context, merging_param_key_arg);
std::unique_ptr<MergeTreeSettings> storage_settings = std::make_unique<MergeTreeSettings>(context->getMergeTreeSettings());
UUID uuid;
UUIDHelpers::getHighBytes(uuid) = 0xffffffffffff0fffull | 0x0000000000004000ull;
UUIDHelpers::getLowBytes(uuid) = 0x3fffffffffffffffull | 0x8000000000000000ull;
SCOPE_EXIT({ do_remove("WritePipeline_MergeTree"); });
auto merge_tree = std::make_shared<StorageMergeTree>(
StorageID("", "", uuid),
"WritePipeline_MergeTree",
metadata,
LoadingStrictnessLevel::CREATE,
context,
"",
merging_params,
std::move(storage_settings));
auto header = toShared(Block{{INT(), "id"}, {STRING(), "Name"}, {makeNullable(INT()), "Age"}});
DB::Squashing squashing(header, settings[Setting::min_insert_block_size_rows], settings[Setting::min_insert_block_size_bytes]);
squashing.add(person_chunk());
auto x = Squashing::squash(squashing.flush());
x.getChunkInfos().add(std::make_shared<DeduplicationToken::TokenInfo>());
ASSERT_EQ(7, x.getNumRows());
ASSERT_EQ(3, x.getNumColumns());
auto metadata_snapshot = std::make_shared<const StorageInMemoryMetadata>(metadata);
ASTPtr none;
auto sink = std::static_pointer_cast<MergeTreeSink>(merge_tree->write(none, metadata_snapshot, context, false));
sink->consume(x);
sink->onFinish();
}
INCBIN(_1_mergetree_, SOURCE_DIR "/utils/extern-local-engine/tests/json/mergetree/1_mergetree.json");
INCBIN(_1_mergetree_hdfs_, SOURCE_DIR "/utils/extern-local-engine/tests/json/mergetree/1_mergetree_hdfs.json");
INCBIN(_1_read_, SOURCE_DIR "/utils/extern-local-engine/tests/json/mergetree/1_plan.json");
TEST(MergeTree, SparkMergeTree)
{
ThreadStatus thread_status;
const auto context = DB::Context::createCopy(QueryContext::globalContext());
const Settings & settings = context->getSettingsRef();
const auto extension_table = local_engine::JsonStringToMessage<substrait::ReadRel::ExtensionTable>(EMBEDDED_PLAN(_1_mergetree_));
MergeTreeTableInstance merge_tree_table(extension_table);
EXPECT_EQ(merge_tree_table.database, "default");
EXPECT_EQ(merge_tree_table.table, "lineitem_mergetree");
EXPECT_EQ(merge_tree_table.relative_path, "lineitem_mergetree");
EXPECT_EQ(merge_tree_table.table_configs.storage_policy, "default");
do_remove(merge_tree_table.relative_path);
const auto dest_storage = merge_tree_table.getStorage(QueryContext::globalMutableContext());
EXPECT_TRUE(dest_storage);
EXPECT_FALSE(dest_storage->getStoragePolicy()->getAnyDisk()->isRemote());
DB::StorageMetadataPtr metadata_snapshot = dest_storage->getInMemoryMetadataPtr();
Block header = metadata_snapshot->getSampleBlock();
constexpr std::string_view split_template
= R"({"items":[{"uriFile":"{replace_local_files}","length":"19230111","parquet":{},"schema":{},"metadataColumns":[{}],"properties":{"fileSize":"19230111","modificationTime":"1722330598029"}}]})";
constexpr std::string_view file{GLUTEN_SOURCE_TPCH_URI("lineitem/part-00000-d08071cb-0dfa-42dc-9198-83cb334ccda3-c000.snappy.parquet")};
SparkMergeTreeWritePartitionSettings gm_write_settings{
.part_name_prefix{"this_is_prefix"},
};
gm_write_settings.set(context);
auto writer = local_engine::SparkMergeTreeWriter::create(merge_tree_table, context, SparkMergeTreeWriter::CPP_UT_JOB_ID);
SparkMergeTreeWriter & spark_merge_tree_writer = *writer;
auto [_, local_executor] = test::create_plan_and_executor(EMBEDDED_PLAN(_1_read_), split_template, file);
EXPECT_TRUE(local_executor->hasNext());
do
{
spark_merge_tree_writer.write(*local_executor->nextColumnar());
} while (local_executor->hasNext());
spark_merge_tree_writer.close();
///
{
const auto extension_table_hdfs
= local_engine::JsonStringToMessage<substrait::ReadRel::ExtensionTable>(EMBEDDED_PLAN(_1_mergetree_hdfs_));
MergeTreeTableInstance merge_tree_table_hdfs(extension_table_hdfs);
EXPECT_EQ(merge_tree_table_hdfs.database, "default");
EXPECT_EQ(merge_tree_table_hdfs.table, "lineitem_mergetree_hdfs");
EXPECT_EQ(merge_tree_table_hdfs.relative_path, "3.5/test/lineitem_mergetree_hdfs");
EXPECT_EQ(merge_tree_table_hdfs.table_configs.storage_policy, "__hdfs_main");
const auto dest_storage_hdfs = merge_tree_table_hdfs.getStorage(QueryContext::globalMutableContext());
EXPECT_TRUE(dest_storage_hdfs);
EXPECT_TRUE(dest_storage_hdfs->getStoragePolicy()->getAnyDisk()->isRemote());
}
}
INCBIN(_3_mergetree_plan_input_, SOURCE_DIR "/utils/extern-local-engine/tests/json/mergetree/lineitem_parquet_input.json");
namespace
{
void writeMerge(
std::string_view json_plan,
const std::string & outputPath,
const TestSettings & test_settings,
const std::function<void(const DB::Block &)> & callback,
std::optional<std::string> input = std::nullopt)
{
auto query_id = QueryContext::instance().initializeQuery("gtest_mergetree");
SCOPE_EXIT({ QueryContext::instance().finalizeQuery(query_id); });
const auto context = QueryContext::instance().currentQueryContext();
for (const auto & x : test_settings)
context->setSetting(x.first, x.second);
GlutenWriteSettings settings{.task_write_tmp_dir = outputPath};
settings.set(context);
SparkMergeTreeWritePartitionSettings partition_settings{.part_name_prefix = "_1"};
partition_settings.set(context);
auto input_json = input.value_or(replaceLocalFilesWithTPCH(EMBEDDED_PLAN(_3_mergetree_plan_input_)));
auto [_, local_executor] = test::create_plan_and_executor(json_plan, input_json, context);
while (local_executor->hasNext())
callback(*local_executor->nextColumnar());
}
}
INCBIN(_3_mergetree_plan_, SOURCE_DIR "/utils/extern-local-engine/tests/json/mergetree/3_one_pipeline.json");
INCBIN(_4_mergetree_plan_, SOURCE_DIR "/utils/extern-local-engine/tests/json/mergetree/4_one_pipeline.json");
INCBIN(_lowcard_plan_, SOURCE_DIR "/utils/extern-local-engine/tests/json/mergetree/lowcard.json");
INCBIN(_case_sensitive_plan_, SOURCE_DIR "/utils/extern-local-engine/tests/json/mergetree/case_sensitive.json");
TEST(MergeTree, Pipeline)
{
// context->setSetting("mergetree.max_num_part_per_merge_task", 1);
writeMerge(
EMBEDDED_PLAN(_3_mergetree_plan_),
"tmp/lineitem_mergetree",
{{"min_insert_block_size_rows", 100000} /*, {"optimize.minFileSize", 1024 * 1024 * 10}*/},
[&](const DB::Block & block)
{
EXPECT_EQ(1, block.rows());
std::cerr << debug::verticalShowString(block, 10, 50) << std::endl;
});
}
TEST(MergeTree, PipelineWithPartition)
{
writeMerge(
EMBEDDED_PLAN(_4_mergetree_plan_),
"tmp/lineitem_mergetree_p",
{},
[&](const DB::Block & block)
{
EXPECT_EQ(3815, block.rows());
std::cerr << debug::showString(block, 50, 50) << std::endl;
});
}
TEST(MergeTree, lowcard)
{
writeMerge(
EMBEDDED_PLAN(_lowcard_plan_),
"tmp/lineitem_mergetre_lowcard",
{},
[&](const DB::Block & block)
{
EXPECT_EQ(1, block.rows());
std::cerr << debug::verticalShowString(block, 10, 50) << std::endl;
});
}
TEST(MergeTree, case_sensitive)
{
//TODO: case_sensitive
GTEST_SKIP();
writeMerge(
EMBEDDED_PLAN(_case_sensitive_plan_),
"tmp/LINEITEM_MERGETREE_CASE_SENSITIVE",
{},
[&](const DB::Block & block)
{
EXPECT_EQ(1, block.rows());
std::cerr << debug::showString(block, 20, 50) << std::endl;
});
}