| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include <incbin.h> |
| #include <testConfig.h> |
| #include <Core/Settings.h> |
| #include <Disks/ObjectStorages/HDFS/HDFSObjectStorage.h> |
| #include <Interpreters/Context.h> |
| #include <Interpreters/ExpressionActions.h> |
| #include <Parser/LocalExecutor.h> |
| #include <Parser/RelParsers/WriteRelParser.h> |
| #include <Parser/TypeParser.h> |
| #include <Parsers/ASTCreateQuery.h> |
| #include <Parsers/ParserCreateQuery.h> |
| #include <Parsers/parseQuery.h> |
| #include <Processors/Chunk.h> |
| #include <Storages/ObjectStorage/HDFS/Configuration.h> |
| #include <Storages/ObjectStorage/StorageObjectStorageSink.h> |
| #include <Storages/Output/NormalFileWriter.h> |
| #include <google/protobuf/wrappers.pb.h> |
| #include <gtest/gtest.h> |
| #include <substrait/plan.pb.h> |
| #include <tests/utils/gluten_test_util.h> |
| #include <Poco/StringTokenizer.h> |
| #include <Common/DebugUtils.h> |
| #include <Common/QueryContext.h> |
| |
| namespace DB::Setting |
| { |
| extern const SettingsUInt64 max_parser_depth; |
| extern const SettingsUInt64 max_parser_backtracks; |
| extern const SettingsBool allow_settings_after_format_in_insert; |
| extern const SettingsUInt64 max_query_size; |
| extern const SettingsUInt64 min_insert_block_size_rows; |
| extern const SettingsUInt64 min_insert_block_size_bytes; |
| } |
| |
| using namespace local_engine; |
| using namespace DB; |
| |
| Chunk testChunk() |
| { |
| auto nameCol = STRING()->createColumn(); |
| nameCol->insert("one"); |
| nameCol->insert("two"); |
| nameCol->insert("three"); |
| |
| auto valueCol = UINT()->createColumn(); |
| valueCol->insert(1); |
| valueCol->insert(2); |
| valueCol->insert(3); |
| MutableColumns x; |
| x.push_back(std::move(nameCol)); |
| x.push_back(std::move(valueCol)); |
| return {std::move(x), 3}; |
| } |
| |
| TEST(LocalExecutor, StorageObjectStorageSink) |
| { |
| /// 0. Create ObjectStorage for HDFS |
| auto settings = QueryContext::globalContext()->getSettingsRef(); |
| const std::string query |
| = R"(CREATE TABLE hdfs_engine_xxxx (name String, value UInt32) ENGINE=HDFS('hdfs://localhost:8020/clickhouse/test2', 'Parquet'))"; |
| DB::ParserCreateQuery parser; |
| std::string error_message; |
| const char * pos = query.data(); |
| auto ast = DB::tryParseQuery( |
| parser, |
| pos, |
| pos + query.size(), |
| error_message, |
| /* hilite = */ false, |
| "QUERY TEST", |
| /* allow_multi_statements = */ false, |
| 0, |
| settings[Setting::max_parser_depth], |
| settings[Setting::max_parser_backtracks], |
| true); |
| auto & create = ast->as<ASTCreateQuery &>(); |
| auto arg = create.storage->children[0]; |
| const auto * func = arg->as<const ASTFunction>(); |
| EXPECT_TRUE(func && func->name == "HDFS"); |
| |
| DB::StorageHDFSConfiguration config; |
| StorageObjectStorageConfiguration::initialize(config, arg->children[0]->children, QueryContext::globalContext(), false); |
| |
| const std::shared_ptr<DB::HDFSObjectStorage> object_storage |
| = std::dynamic_pointer_cast<DB::HDFSObjectStorage>(config.createObjectStorage(QueryContext::globalContext(), false)); |
| EXPECT_TRUE(object_storage != nullptr); |
| |
| RelativePathsWithMetadata files_with_metadata; |
| object_storage->listObjects("/clickhouse", files_with_metadata, 0); |
| |
| /// 1. Create ObjectStorageSink |
| auto config_cloned_ptr = std::make_shared<StorageHDFSConfiguration>(config); |
| DB::StorageObjectStorageSink sink{ |
| config_cloned_ptr->getPaths().back().path, |
| object_storage, |
| config_cloned_ptr, |
| {}, |
| toShared(Block{{STRING(), "name"}, {UINT(), "value"}}), |
| QueryContext::globalContext()}; |
| |
| /// 2. Create Chunk |
| auto chunk = testChunk(); |
| /// 3. comsume |
| sink.consume(chunk); |
| sink.onFinish(); |
| } |
| |
| |
| INCBIN(native_write, SOURCE_DIR "/utils/extern-local-engine/tests/json/native_write_plan.json"); |
| TEST(WritePipeline, SubstraitFileSink) |
| { |
| const auto context = DB::Context::createCopy(QueryContext::globalContext()); |
| GlutenWriteSettings settings{ |
| .task_write_tmp_dir = "file:///tmp/test_table/test", |
| .task_write_filename_pattern = "data.parquet", |
| }; |
| settings.set(context); |
| |
| constexpr std::string_view split_template |
| = R"({"items":[{"uriFile":"{replace_local_files}","length":"1399183","text":{"fieldDelimiter":"|","maxBlockSize":"8192"},"schema":{"names":["s_suppkey","s_name","s_address","s_nationkey","s_phone","s_acctbal","s_comment"],"struct":{"types":[{"i64":{"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}},{"i64":{"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}},{"decimal":{"scale":2,"precision":15,"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}}]}},"metadataColumns":[{}]}]})"; |
| constexpr std::string_view file{GLUTEN_SOURCE_URI("/backends-clickhouse/src/test/resources/csv-data/supplier.csv")}; |
| auto [plan, local_executor] = test::create_plan_and_executor(EMBEDDED_PLAN(native_write), split_template, file, context); |
| |
| EXPECT_EQ(1, plan.relations_size()); |
| const substrait::PlanRel & root_rel = plan.relations().at(0); |
| EXPECT_TRUE(root_rel.has_root()); |
| EXPECT_TRUE(root_rel.root().input().has_write()); |
| |
| const substrait::WriteRel & write_rel = root_rel.root().input().write(); |
| EXPECT_TRUE(write_rel.has_named_table()); |
| |
| const substrait::NamedObjectWrite & named_table = write_rel.named_table(); |
| EXPECT_TRUE(write_rel.has_table_schema()); |
| const substrait::NamedStruct & table_schema = write_rel.table_schema(); |
| auto block = TypeParser::buildBlockFromNamedStruct(table_schema); |
| auto names = block.getNames(); |
| DB::Names expected{"s_suppkey", "s_name", "s_address", "s_nationkey", "s_phone", "s_acctbal", "s_comment111"}; |
| EXPECT_EQ(expected, names); |
| |
| auto partitionCols = collect_partition_cols(block, table_schema, {}); |
| DB::Names expected_partition_cols; |
| EXPECT_EQ(expected_partition_cols, partitionCols); |
| |
| EXPECT_TRUE(local_executor->hasNext()); |
| const Block & x = *local_executor->nextColumnar(); |
| std::cerr << debug::verticalShowString(x, 10, 50) << std::endl; |
| EXPECT_EQ(1, x.rows()); |
| const auto & col_a = *(x.getColumns()[0]); |
| EXPECT_EQ(settings.task_write_filename_pattern, col_a.getDataAt(0)); |
| const auto & col_b = *(x.getColumns()[1]); |
| EXPECT_EQ(WriteStatsBase::NO_PARTITION_ID, col_b.getDataAt(0)); |
| const auto & col_c = *(x.getColumns()[2]); |
| EXPECT_EQ(10000, col_c.getInt(0)); |
| } |
| |
| INCBIN(native_write_one_partition, SOURCE_DIR "/utils/extern-local-engine/tests/json/native_write_one_partition.json"); |
| |
| /*TEST(WritePipeline, SubstraitPartitionedFileSink) |
| { |
| const auto context = DB::Context::createCopy(QueryContext::globalContext()); |
| GlutenWriteSettings settings{ |
| .task_write_tmp_dir = "file:///tmp/test_table/test_partition", |
| .task_write_filename_pattern = "data.parquet", |
| }; |
| settings.set(context); |
| |
| constexpr std::string_view split_template |
| = R"({"items":[{"uriFile":"{replace_local_files}","length":"1399183","text":{"fieldDelimiter":"|","maxBlockSize":"8192"},"schema":{"names":["s_suppkey","s_name","s_address","s_nationkey","s_phone","s_acctbal","s_comment"],"struct":{"types":[{"i64":{"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}},{"i64":{"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}},{"decimal":{"scale":2,"precision":15,"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}}]}},"metadataColumns":[{}]}]})"; |
| constexpr std::string_view file{GLUTEN_SOURCE_DIR("/backends-clickhouse/src/test/resources/csv-data/supplier.csv")}; |
| auto [plan, local_executor] = test::create_plan_and_executor(EMBEDDED_PLAN(native_write_one_partition), split_template, file, context); |
| |
| EXPECT_EQ(1, plan.relations_size()); |
| const substrait::PlanRel & root_rel = plan.relations().at(0); |
| EXPECT_TRUE(root_rel.has_root()); |
| EXPECT_TRUE(root_rel.root().input().has_write()); |
| |
| const substrait::WriteRel & write_rel = root_rel.root().input().write(); |
| EXPECT_TRUE(write_rel.has_named_table()); |
| |
| EXPECT_TRUE(write_rel.has_table_schema()); |
| const substrait::NamedStruct & table_schema = write_rel.table_schema(); |
| auto block = TypeParser::buildBlockFromNamedStruct(table_schema); |
| auto names = block.getNames(); |
| DB::Names expected{"s_suppkey", "s_name", "s_address", "s_phone", "s_acctbal", "s_comment", "s_nationkey"}; |
| EXPECT_EQ(expected, names); |
| |
| auto partitionCols = local_engine::collect_partition_cols(block, table_schema, {}); |
| DB::Names expected_partition_cols{"s_nationkey"}; |
| EXPECT_EQ(expected_partition_cols, partitionCols); |
| |
| EXPECT_TRUE(local_executor->hasNext()); |
| const Block & x = *local_executor->nextColumnar(); |
| debug::headBlock(x, 25); |
| EXPECT_EQ(25, x.rows()); |
| }*/ |
| |
| TEST(WritePipeline, ComputePartitionedExpression) |
| { |
| const auto context = DB::Context::createCopy(QueryContext::globalContext()); |
| |
| Block sample_block{{STRING(), "name"}, {UINT(), "s_nationkey"}}; |
| auto partition_by = SubstraitPartitionedFileSink::make_partition_expression({"s_nationkey", "name"}, sample_block); |
| // auto partition_by = printColumn("s_nationkey"); |
| |
| ASTs arguments(1, partition_by); |
| ASTPtr partition_by_string = makeASTFunction("toString", std::move(arguments)); |
| |
| |
| auto syntax_result = TreeRewriter(context).analyze(partition_by_string, sample_block.getNamesAndTypesList()); |
| auto partition_by_expr = ExpressionAnalyzer(partition_by_string, syntax_result, context).getActions(false); |
| auto partition_by_column_name = partition_by_string->getColumnName(); |
| |
| Chunk chunk = testChunk(); |
| const auto & columns = chunk.getColumns(); |
| Block block_with_partition_by_expr = sample_block.cloneWithoutColumns(); |
| block_with_partition_by_expr.setColumns(columns); |
| partition_by_expr->execute(block_with_partition_by_expr); |
| |
| size_t chunk_rows = chunk.getNumRows(); |
| EXPECT_EQ(3, chunk_rows); |
| |
| const auto * partition_by_result_column = block_with_partition_by_expr.getByName(partition_by_column_name).column.get(); |
| EXPECT_EQ("s_nationkey=1/name=one", partition_by_result_column->getDataAt(0)); |
| EXPECT_EQ("s_nationkey=2/name=two", partition_by_result_column->getDataAt(1)); |
| EXPECT_EQ("s_nationkey=3/name=three", partition_by_result_column->getDataAt(2)); |
| } |