blob: 021f59722e8965405567f79875990d6ae7bc025e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "config.h"
#if USE_PARQUET
#include <ranges>
#include <incbin.h>
#include <Columns/ColumnNullable.h>
#include <Core/Range.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypesDecimal.h>
#include <IO/ReadBufferFromFile.h>
#include <Interpreters/ActionsDAG.h>
#include <Interpreters/ActionsVisitor.h>
#include <Parser/LocalExecutor.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Formats/Impl/ArrowColumnToCHColumn.h>
#include <Processors/Formats/Impl/ParquetBlockInputFormat.h>
#include <QueryPipeline/QueryPipeline.h>
#include <Storages/Parquet/ArrowUtils.h>
#include <Storages/Parquet/VectorizedParquetRecordReader.h>
#include <Storages/SubstraitSource/ParquetFormatFile.h>
#include <gtest/gtest.h>
#include <parquet/arrow/reader.h>
#include <parquet/level_conversion.h>
#include <tests/utils/gluten_test_util.h>
#include <Common/BlockTypeUtils.h>
#include <Common/DebugUtils.h>
#include <Common/QueryContext.h>
using namespace DB;
using namespace local_engine;
void readSchema(const String & path)
{
auto name_and_types = test::readParquetSchema(test::gtest_data(path.c_str()));
auto & factory = DataTypeFactory::instance();
auto check_type = [&name_and_types, &factory](const String & column, const String & expect_str_type)
{
auto expect_type = factory.get(expect_str_type);
auto name_and_type = name_and_types.tryGetByName(column);
EXPECT_TRUE(name_and_type);
EXPECT_TRUE(name_and_type->type->equals(*expect_type))
<< "real_type:" << name_and_type->type->getName() << ", expect_type:" << expect_type->getName();
};
check_type("f_bool", "Nullable(UInt8)");
check_type("f_byte", "Nullable(Int8)");
check_type("f_short", "Nullable(Int16)");
check_type("f_int", "Nullable(Int32)");
check_type("f_long", "Nullable(Int64)");
check_type("f_float", "Nullable(Float32)");
check_type("f_double", "Nullable(Float64)");
check_type("f_string", "Nullable(String)");
check_type("f_binary", "Nullable(String)");
check_type("f_decimal", "Nullable(Decimal(10, 2))");
check_type("f_date", "Nullable(Date32)");
check_type("f_timestamp", "Nullable(DateTime64(9))");
// check_type("f_array", "Nullable(Array(Nullable(String)))");
// check_type("f_array_array", "Nullable(Array(Nullable(Array(Nullable(String)))))");
// check_type("f_array_map", "Nullable(Array(Nullable(Map(String, Nullable(Int64)))))");
// check_type("f_array_struct", "Nullable(Array(Nullable(Tuple(a Nullable(String), b Nullable(Int64)))))");
// check_type("f_map", "Nullable(Map(String, Nullable(Int64)))");
// check_type("f_map_map", "Nullable(Map(String, Nullable(Map(String, Nullable(Int64)))))");
// check_type("f_map_array", "Nullable(Map(String, Nullable(Array(Nullable(Int64)))))");
// check_type("f_map_struct", "Nullable(Map(String, Nullable(Tuple(a Nullable(String), b Nullable(Int64)))))");
// check_type("f_struct", "Nullable(Tuple(a Nullable(String), b Nullable(Int64)))");
// check_type(
// "f_struct_struct",
// "Nullable(Tuple(a Nullable(String), b Nullable(Int64), c Nullable(Tuple(x Nullable(String), y Nullable(Int64)))))");
// check_type("f_struct_array", "Nullable(Tuple(a Nullable(String), b Nullable(Int64), c Nullable(Array(Nullable(Int64)))))");
// check_type("f_struct_map", "Nullable(Tuple(a Nullable(String), b Nullable(Int64), c Nullable(Map(String, Nullable(Int64)))))");
}
BlockRowType createColumn(const String & full_path, const std::map<String, Field> & fields)
{
const auto name_and_types = test::readParquetSchema(full_path);
auto is_selected = [&fields](const auto & name_and_type) { return fields.contains(name_and_type.name); };
return toBlockRowType(name_and_types, is_selected);
}
template <class InputFormat>
void readData(const String & path, const std::map<String, Field> & fields)
{
String full_path = test::gtest_data(path.c_str());
FormatSettings settings;
auto header = toShared(createColumn(full_path, fields));
ReadBufferFromFile in(full_path);
InputFormatPtr format;
auto parser_group
= std::make_shared<FormatFilterInfo>(nullptr, QueryContext::globalContext(), nullptr);
auto parser_shared_resources
= std::make_shared<FormatParserSharedResources>(QueryContext::globalContext()->getSettingsRef(), /*num_streams_=*/1);
if constexpr (std::is_same_v<InputFormat, DB::ParquetBlockInputFormat>)
format = std::make_shared<InputFormat>(in, header, settings, parser_shared_resources, parser_group, 8192);
else
format = std::make_shared<InputFormat>(in, header, settings);
QueryPipeline pipeline(format);
PullingPipelineExecutor reader(pipeline);
Block block;
EXPECT_TRUE(reader.pull(block));
EXPECT_EQ(block.rows(), 1);
const auto & columns = header->getColumnsWithTypeAndName();
for (const auto & column_header : columns)
{
const auto & name = column_header.name;
auto it = fields.find(name);
if (it != fields.end())
{
const auto & column = block.getByName(name);
auto field = (*column.column)[0];
auto expect_field = it->second;
EXPECT_TRUE(field == expect_field) << "field:" << toString(field) << ", expect_field:" << toString(expect_field);
}
}
}
TEST(ParquetRead, ReadSchema)
{
readSchema("alltypes/alltypes_notnull.parquet");
readSchema("alltypes/alltypes_null.parquet");
}
TEST(ParquetRead, VerifyPageindexReaderSupport)
{
EXPECT_FALSE(
ParquetFormatFile::onlyHasFlatType(toSampleBlock(test::readParquetSchema(test::gtest_data("alltypes/alltypes_notnull.parquet")))));
EXPECT_FALSE(
ParquetFormatFile::onlyHasFlatType(toSampleBlock(test::readParquetSchema(test::gtest_data("alltypes/alltypes_null.parquet")))));
EXPECT_FALSE(ParquetFormatFile::onlyHasFlatType(toSampleBlock(test::readParquetSchema(test::gtest_data("array.parquet")))));
EXPECT_TRUE(ParquetFormatFile::onlyHasFlatType(toSampleBlock(test::readParquetSchema(test::gtest_data("date.parquet")))));
EXPECT_TRUE(ParquetFormatFile::onlyHasFlatType(toSampleBlock(test::readParquetSchema(test::gtest_data("datetime64.parquet")))));
EXPECT_TRUE(ParquetFormatFile::onlyHasFlatType(toSampleBlock(test::readParquetSchema(test::gtest_data("decimal.parquet")))));
EXPECT_TRUE(ParquetFormatFile::onlyHasFlatType(toSampleBlock(test::readParquetSchema(test::gtest_data("iris.parquet")))));
EXPECT_FALSE(ParquetFormatFile::onlyHasFlatType(toSampleBlock(test::readParquetSchema(test::gtest_data("map.parquet")))));
EXPECT_TRUE(ParquetFormatFile::onlyHasFlatType(toSampleBlock(test::readParquetSchema(test::gtest_data("sample.parquet")))));
EXPECT_FALSE(ParquetFormatFile::onlyHasFlatType(toSampleBlock(test::readParquetSchema(test::gtest_data("struct.parquet")))));
}
TEST(ParquetRead, ReadDataNotNull)
{
const std::map<String, Field> fields{
{"f_array", Array{"hello", "world"}},
{"f_bool", UInt8(1)},
{"f_byte", Int8(1)},
{"f_short", Int16(2)},
{"f_int", Int32(3)},
{"f_long", Int64(4)},
{"f_float", Float32(5.5)},
{"f_double", Float64(6.6)},
{"f_string", "hello world"},
{"f_binary", "hello world"},
{"f_decimal", DecimalField<Decimal64>(777, 2)},
{"f_date", Int32(18262)},
{"f_timestamp", DecimalField<DateTime64>(1666162060000000L, 6)},
{"f_array", Array{"hello", "world"}},
{
"f_array_array",
[]() -> Field
{
Array res;
res.push_back(Array{"hello"});
res.push_back(Array{"world"});
return std::move(res);
}(),
},
{
"f_array_map",
[]() -> Field
{
Array res;
Map map;
map.push_back(Tuple{"hello", Int64(1)});
res.push_back(map);
map.clear();
map.push_back(Tuple{"world", Int64(2)});
res.push_back(map);
return std::move(res);
}(),
},
{
"f_array_struct",
[]() -> Field
{
Array res;
res.push_back(Tuple{"hello", Int64(1)});
res.push_back(Tuple{"world", Int64(2)});
return std::move(res);
}(),
},
{
"f_map",
[]() -> Field
{
Map res;
res.push_back(Tuple{"hello", Int64(1)});
res.push_back(Tuple{"world", Int64(2)});
return std::move(res);
}(),
},
{
"f_map_map",
[]() -> Field
{
Map nested_map;
nested_map.push_back(Tuple{"world", Int64(3)});
Map res;
res.push_back(Tuple{"hello", std::move(nested_map)});
return std::move(res);
}(),
},
{
"f_map_array",
[]() -> Field
{
Array array{Int64(1), Int64(2), Int64(3)};
Map res;
res.push_back(Tuple{"hello", std::move(array)});
return std::move(res);
}(),
},
{
"f_map_struct",
[]() -> Field
{
Tuple tuple{"world", Int64(4)};
Map res;
res.push_back(Tuple{"hello", std::move(tuple)});
return std::move(res);
}(),
},
{
"f_struct",
[]() -> Field
{
Tuple res{"hello world", Int64(5)};
return std::move(res);
}(),
},
{
"f_struct_struct",
[]() -> Field
{
Tuple tuple{"world", Int64(6)};
Tuple res{"hello", Int64(6), std::move(tuple)};
return std::move(res);
}(),
},
{
"f_struct_array",
[]() -> Field
{
Array array{Int64(1), Int64(2), Int64(3)};
Tuple res{"hello", Int64(7), std::move(array)};
return std::move(res);
}(),
},
{
"f_struct_map",
[]() -> Field
{
Map map;
map.push_back(Tuple{"world", Int64(9)});
Tuple res{"hello", Int64(8), std::move(map)};
return std::move(res);
}(),
},
};
const std::map<String, Field> primitive_fields{
{"f_bool", UInt8(1)},
{"f_byte", Int8(1)},
{"f_short", Int16(2)},
{"f_int", Int32(3)},
{"f_long", Int64(4)},
{"f_float", Float32(5.5)},
{"f_double", Float64(6.6)},
{"f_string", "hello world"},
{"f_binary", "hello world"},
{"f_decimal", DecimalField<Decimal64>(777, 2)},
{"f_date", Int32(18262)},
{"f_timestamp", DecimalField<DateTime64>(1666162060000000L, 6)}};
readData<ParquetBlockInputFormat>("alltypes/alltypes_notnull.parquet", fields);
}
TEST(ParquetRead, ReadDataNull)
{
GTEST_SKIP();
std::map<String, Field> fields{
{"f_array", Null{}}, {"f_bool", Null{}}, {"f_byte", Null{}}, {"f_short", Null{}},
{"f_int", Null{}}, {"f_long", Null{}}, {"f_float", Null{}}, {"f_double", Null{}},
{"f_string", Null{}}, {"f_binary", Null{}}, {"f_decimal", Null{}}, {"f_date", Null{}},
{"f_timestamp", Null{}}, {"f_array", Null{}}, {"f_array_array", Null{}}, {"f_array_map", Null{}},
{"f_array_struct", Null{}}, {"f_map", Null{}}, {"f_map_map", Null{}}, {"f_map_array", Null{}},
{"f_map_struct", Null{}}, {"f_struct", Null{}}, {"f_struct_struct", Null{}}, {"f_struct_array", Null{}},
{"f_struct_map", Null{}},
};
readData<ParquetBlockInputFormat>("alltypes/alltypes_null.parquet", fields);
}
TEST(ParquetRead, ArrowRead)
{
// sample.parquet holds two columns (a: BIGINT, b: DOUBLE) and
// 20 rows (10 rows per group). Group offsets are 153 and 614.
// Data is in plain uncompressed format:
// a: [1..20]
// b: [1.0..20.0]
const std::string sample(test::gtest_data("sample.parquet"));
ReadBufferFromFile in(sample);
const FormatSettings format_settings{};
auto arrow_file = test::asArrowFileForParquet(in, format_settings);
// std::shared_ptr<parquet::FileMetaData> metadata = parquet::ReadMetaData(arrow_file);
std::unique_ptr<parquet::arrow::FileReader> reader;
PARQUET_THROW_NOT_OK(parquet::arrow::OpenFile(arrow_file, arrow::default_memory_pool(), &reader));
std::shared_ptr<arrow::Table> table;
PARQUET_THROW_NOT_OK(reader->ReadTable(&table));
EXPECT_EQ(table->num_rows(), 20);
EXPECT_EQ(table->num_columns(), 2);
auto columns = toSampleBlock(test::readParquetSchema(sample));
Block header(columns);
ArrowColumnToCHColumn converter(
header,
"Parquet",
format_settings,
std::nullopt,
std::nullopt,
format_settings.parquet.allow_missing_columns,
format_settings.null_as_default,
format_settings.date_time_overflow_behavior,
format_settings.parquet.case_insensitive_column_matching);
Chunk chunk = converter.arrowTableToCHChunk(table, table->num_rows(), nullptr, nullptr);
Block res = header.cloneWithColumns(chunk.detachColumns());
EXPECT_EQ(res.rows(), 20);
ASSERT_TRUE(res.getByName("a").column != nullptr);
const auto & col_a = checkAndGetColumn<ColumnNullable>(res.getByName("a").column.get())->getNestedColumn();
for (size_t i = 0; i < res.rows(); i++)
EXPECT_EQ(col_a.get64(i), i + 1);
ASSERT_TRUE(res.getByName("b").column != nullptr);
const auto & col_b = checkAndGetColumn<ColumnNullable>(res.getByName("b").column.get())->getNestedColumn();
for (size_t i = 0; i < res.rows(); i++)
EXPECT_EQ(col_b.getFloat64(i), i + 1);
}
TEST(ParquetRead, LowLevelRead)
{
const std::string sample(test::gtest_data("sample.parquet"));
// Create a ParquetReader instance
const std::unique_ptr<parquet::ParquetFileReader> parquet_reader = parquet::ParquetFileReader::OpenFile(sample, false);
// Get the File MetaData
const std::shared_ptr<parquet::FileMetaData> file_metadata = parquet_reader->metadata();
// Get the number of RowGroups
const int num_row_groups = file_metadata->num_row_groups();
EXPECT_EQ(num_row_groups, 2);
// Get the number of Columns
const int num_columns = file_metadata->num_columns();
EXPECT_EQ(num_columns, 2);
constexpr int col_a = 0;
const parquet::SchemaDescriptor & schema = *(file_metadata->schema());
const parquet::ColumnDescriptor & column_a_descr = *(schema.Column(col_a));
EXPECT_EQ(column_a_descr.name(), "a");
const parquet::internal::LevelInfo level_info = computeLevelInfo(&column_a_descr);
const auto reader = parquet::internal::RecordReader::Make(&column_a_descr, level_info);
// Iterate over all the RowGroups in the file
for (int r = 0; r < num_row_groups; ++r)
{
// Get the RowGroup Reader
std::shared_ptr<parquet::RowGroupReader> row_group_reader = parquet_reader->RowGroup(r);
reader->SetPageReader(row_group_reader->GetColumnPageReader(col_a));
int64_t records_read = reader->ReadRecords(10);
EXPECT_EQ(records_read, 10);
const auto read_values = reinterpret_cast<const Int64 *>(reader->values());
ASSERT_TRUE(read_values != nullptr);
}
}
TEST(ParquetRead, VectorizedColumnReader)
{
const std::string sample(test::gtest_data("sample.parquet"));
const FormatSettings format_settings{};
Block blockHeader({{DOUBLE(), "b"}, {BIGINT(), "a"}});
ReadBufferFromFile in(sample);
ParquetMetaBuilder metaBuilder{.collectPageIndex = true};
metaBuilder.build(in, blockHeader);
ColumnIndexRowRangesProvider provider{metaBuilder};
VectorizedParquetRecordReader recordReader(blockHeader, format_settings);
auto arrow_file = test::asArrowFileForParquet(in, format_settings);
recordReader.initialize(arrow_file, provider);
auto chunk{recordReader.nextBatch()};
ASSERT_EQ(chunk.getNumColumns(), 2);
ASSERT_EQ(chunk.getNumRows(), 20);
// const auto & col_a = checkAndGetColumn<ColumnNullable>(*(chunk.getColumns()[1]))->getNestedColumn();
/// TODO: nullable
const auto & col_a = *(chunk.getColumns()[1]);
for (size_t i = 0; i < chunk.getNumRows(); i++)
EXPECT_EQ(col_a.get64(i), i + 1);
const auto & col_b = *(chunk.getColumns()[0]);
for (size_t i = 0; i < chunk.getNumRows(); i++)
EXPECT_EQ(col_b.getFloat64(i), i + 1);
}
INCBIN(_upper_col_parquet_, SOURCE_DIR "/utils/extern-local-engine/tests/json/upper_col_parquet.json");
TEST(ParquetRead, UpperColRead)
{
constexpr std::string_view split_template
= R"({"items":[{"uriFile":"{replace_local_files}","length":"459","parquet":{},"schema":{},"metadataColumns":[{"key":"input_file_name","value":"{replace_local_files}"},{"key":"input_file_block_length","value":"459"},{"key":"input_file_block_start","value":"0"}],"properties":{"fileSize":"459","modificationTime":"1735012863732"}}]})";
const std::string file{test::gtest_uri("upper_case_col.parquet")};
auto [_, local_executor] = test::create_plan_and_executor(EMBEDDED_PLAN(_upper_col_parquet_), split_template, file, {});
EXPECT_TRUE(local_executor->hasNext());
const Block & block = *local_executor->nextColumnar();
debug::headBlock(block);
}
#endif