| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #include <ranges> |
| |
| #include <arrow/c/bridge.h> |
| #include <arrow/json/from_string.h> |
| #include <arrow/util/decimal.h> |
| #include <avro/Compiler.hh> |
| #include <avro/Generic.hh> |
| #include <avro/Node.hh> |
| #include <avro/Types.hh> |
| #include <gmock/gmock.h> |
| #include <gtest/gtest.h> |
| |
| #include "iceberg/avro/avro_data_util_internal.h" |
| #include "iceberg/avro/avro_schema_util_internal.h" |
| #include "iceberg/schema.h" |
| #include "iceberg/schema_internal.h" |
| #include "iceberg/schema_util.h" |
| #include "iceberg/test/matchers.h" |
| #include "iceberg/type.h" |
| |
| namespace iceberg::avro { |
| |
| /// \brief Test case structure for parameterized primitive type tests |
| struct AppendDatumParam { |
| std::string name; |
| std::shared_ptr<Type> projected_type; |
| std::shared_ptr<Type> source_type; |
| std::function<void(::avro::GenericDatum&, int)> value_setter; |
| std::string expected_json; |
| }; |
| |
| /// \brief Helper function to create test data for a primitive type |
| std::vector<::avro::GenericDatum> CreateTestData( |
| const ::avro::NodePtr& avro_node, |
| const std::function<void(::avro::GenericDatum&, int)>& value_setter, int count = 3) { |
| std::vector<::avro::GenericDatum> avro_data; |
| for (int i = 0; i < count; ++i) { |
| ::avro::GenericDatum avro_datum(avro_node); |
| value_setter(avro_datum, i); |
| avro_data.push_back(avro_datum); |
| } |
| return avro_data; |
| } |
| |
| /// \brief Utility function to verify AppendDatumToBuilder behavior |
| void VerifyAppendDatumToBuilder(const Schema& projected_schema, |
| const ::avro::NodePtr& avro_node, |
| const std::vector<::avro::GenericDatum>& avro_data, |
| std::string_view expected_array_json) { |
| // Create 1 to 1 projection |
| auto projection_result = Project(projected_schema, avro_node, /*prune_source=*/false); |
| ASSERT_THAT(projection_result, IsOk()); |
| auto projection = std::move(projection_result.value()); |
| |
| // Create arrow schema and array builder |
| ArrowSchema arrow_c_schema; |
| ASSERT_THAT(ToArrowSchema(projected_schema, &arrow_c_schema), IsOk()); |
| auto arrow_schema = ::arrow::ImportSchema(&arrow_c_schema).ValueOrDie(); |
| auto arrow_struct_type = std::make_shared<::arrow::StructType>(arrow_schema->fields()); |
| auto builder = ::arrow::MakeBuilder(arrow_struct_type).ValueOrDie(); |
| |
| // Call AppendDatumToBuilder repeatedly to append the datum |
| for (const auto& avro_datum : avro_data) { |
| ASSERT_THAT(AppendDatumToBuilder(avro_node, avro_datum, projection, projected_schema, |
| /*metadata_context=*/{}, builder.get()), |
| IsOk()); |
| } |
| |
| // Verify the result |
| auto array = builder->Finish().ValueOrDie(); |
| auto expected_array = |
| ::arrow::json::ArrayFromJSONString(arrow_struct_type, expected_array_json) |
| .ValueOrDie(); |
| ASSERT_TRUE(array->Equals(*expected_array)) |
| << "array: " << array->ToString() |
| << "\nexpected_array: " << expected_array->ToString(); |
| } |
| |
| /// \brief Test class for primitive types using parameterized tests |
| class AppendDatumToBuilderTest : public ::testing::TestWithParam<AppendDatumParam> {}; |
| |
| TEST_P(AppendDatumToBuilderTest, PrimitiveType) { |
| const auto& test_case = GetParam(); |
| |
| Schema projected_schema({SchemaField::MakeRequired( |
| /*field_id=*/1, /*name=*/"a", test_case.projected_type)}); |
| Schema source_schema({SchemaField::MakeRequired( |
| /*field_id=*/1, /*name=*/"a", test_case.source_type)}); |
| |
| ::avro::NodePtr avro_node; |
| EXPECT_THAT(ToAvroNodeVisitor{}.Visit(source_schema, &avro_node), IsOk()); |
| |
| auto avro_data = CreateTestData(avro_node, test_case.value_setter); |
| ASSERT_NO_FATAL_FAILURE(VerifyAppendDatumToBuilder(projected_schema, avro_node, |
| avro_data, test_case.expected_json)); |
| } |
| |
| // Define test cases for all primitive types |
| const std::vector<AppendDatumParam> kPrimitiveTestCases = { |
| { |
| .name = "Boolean", |
| .projected_type = iceberg::boolean(), |
| .source_type = iceberg::boolean(), |
| .value_setter = |
| [](::avro::GenericDatum& datum, int i) { |
| datum.value<::avro::GenericRecord>().fieldAt(0).value<bool>() = |
| (i % 2 == 0); |
| }, |
| .expected_json = R"([{"a": true}, {"a": false}, {"a": true}])", |
| }, |
| { |
| .name = "Int", |
| .projected_type = iceberg::int32(), |
| .source_type = iceberg::int32(), |
| .value_setter = |
| [](::avro::GenericDatum& datum, int i) { |
| datum.value<::avro::GenericRecord>().fieldAt(0).value<int32_t>() = i * 100; |
| }, |
| .expected_json = R"([{"a": 0}, {"a": 100}, {"a": 200}])", |
| }, |
| { |
| .name = "Long", |
| .projected_type = iceberg::int64(), |
| .source_type = iceberg::int64(), |
| .value_setter = |
| [](::avro::GenericDatum& datum, int i) { |
| datum.value<::avro::GenericRecord>().fieldAt(0).value<int64_t>() = |
| i * 1000000LL; |
| }, |
| .expected_json = R"([{"a": 0}, {"a": 1000000}, {"a": 2000000}])", |
| }, |
| { |
| .name = "Float", |
| .projected_type = iceberg::float32(), |
| .source_type = iceberg::float32(), |
| .value_setter = |
| [](::avro::GenericDatum& datum, int i) { |
| datum.value<::avro::GenericRecord>().fieldAt(0).value<float>() = i * 3.14f; |
| }, |
| .expected_json = R"([{"a": 0.0}, {"a": 3.14}, {"a": 6.28}])", |
| }, |
| { |
| .name = "Double", |
| .projected_type = iceberg::float64(), |
| .source_type = iceberg::float64(), |
| .value_setter = |
| [](::avro::GenericDatum& datum, int i) { |
| datum.value<::avro::GenericRecord>().fieldAt(0).value<double>() = |
| i * 1.234567890; |
| }, |
| .expected_json = R"([{"a": 0.0}, {"a": 1.234567890}, {"a": 2.469135780}])", |
| }, |
| { |
| .name = "String", |
| .projected_type = iceberg::string(), |
| .source_type = iceberg::string(), |
| .value_setter = |
| [](::avro::GenericDatum& datum, int i) { |
| datum.value<::avro::GenericRecord>().fieldAt(0).value<std::string>() = |
| "test_string_" + std::to_string(i); |
| }, |
| .expected_json = |
| R"([{"a": "test_string_0"}, {"a": "test_string_1"}, {"a": "test_string_2"}])", |
| }, |
| { |
| .name = "Binary", |
| .projected_type = iceberg::binary(), |
| .source_type = iceberg::binary(), |
| .value_setter = |
| [](::avro::GenericDatum& datum, int i) { |
| datum.value<::avro::GenericRecord>() |
| .fieldAt(0) |
| .value<std::vector<uint8_t>>() = {static_cast<uint8_t>('a' + i), |
| static_cast<uint8_t>('b' + i), |
| static_cast<uint8_t>('c' + i)}; |
| }, |
| .expected_json = R"([{"a": "abc"}, {"a": "bcd"}, {"a": "cde"}])", |
| }, |
| { |
| .name = "Fixed", |
| .projected_type = iceberg::fixed(4), |
| .source_type = iceberg::fixed(4), |
| .value_setter = |
| [](::avro::GenericDatum& datum, int i) { |
| datum.value<::avro::GenericRecord>() |
| .fieldAt(0) |
| .value<::avro::GenericFixed>() |
| .value() = { |
| static_cast<uint8_t>('a' + i), static_cast<uint8_t>('b' + i), |
| static_cast<uint8_t>('c' + i), static_cast<uint8_t>('d' + i)}; |
| }, |
| .expected_json = R"([{"a": "abcd"}, {"a": "bcde"}, {"a": "cdef"}])", |
| }, |
| /// FIXME: NotImplemented: MakeBuilder: cannot construct builder for type |
| /// extension<arrow.uuid>. Need to fix this in the upstream Arrow. |
| // { |
| // .name = "UUID", |
| // .projected_type = iceberg::uuid(), |
| // .source_type = iceberg::uuid(), |
| // .value_setter = |
| // [](::avro::GenericDatum& datum, int i) { |
| // datum.value<::avro::GenericRecord>() |
| // .fieldAt(0) |
| // .value<::avro::GenericFixed>() |
| // .value() = {'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', |
| // 'i', 'j', 'k', 'l', 'm', 'n', 'o', |
| // static_cast<uint8_t>(i)}; |
| // }, |
| // .expected_json = R"([{"a": "abcdefghijklmnop"}, {"a": "bcdefghijklmnopq"}, |
| // {"a": "cdefghijklmnopqr"}])", |
| // }, |
| { |
| .name = "Decimal", |
| .projected_type = iceberg::decimal(10, 2), |
| .source_type = iceberg::decimal(10, 2), |
| .value_setter = |
| [](::avro::GenericDatum& datum, int i) { |
| int32_t decimal_value = i * 1000 + i; |
| std::vector<uint8_t>& fixed = datum.value<::avro::GenericRecord>() |
| .fieldAt(0) |
| .value<::avro::GenericFixed>() |
| .value(); |
| // The byte array must contain the two's-complement representation of |
| // the unscaled integer value in big-endian byte order. |
| for (uint8_t& rvalue : std::ranges::reverse_view(fixed)) { |
| rvalue = static_cast<uint8_t>(decimal_value & 0xFF); |
| decimal_value >>= 8; |
| } |
| }, |
| .expected_json = R"([{"a": "0.00"}, {"a": "10.01"}, {"a": "20.02"}])", |
| }, |
| { |
| .name = "Date", |
| .projected_type = iceberg::date(), |
| .source_type = iceberg::date(), |
| .value_setter = |
| [](::avro::GenericDatum& datum, int i) { |
| // Date as days since epoch (1970-01-01) |
| // 0 = 1970-01-01, 1 = 1970-01-02, etc. |
| datum.value<::avro::GenericRecord>().fieldAt(0).value<int32_t>() = |
| 18000 + i; // ~2019-04-11 + i days |
| }, |
| .expected_json = R"([{"a": 18000}, {"a": 18001}, {"a": 18002}])", |
| }, |
| { |
| .name = "Time", |
| .projected_type = iceberg::time(), |
| .source_type = iceberg::time(), |
| .value_setter = |
| [](::avro::GenericDatum& datum, int i) { |
| // Time as microseconds since midnight |
| // 12:30:45.123456 + i seconds = 45045123456 + i*1000000 microseconds |
| datum.value<::avro::GenericRecord>().fieldAt(0).value<int64_t>() = |
| 45045123456LL + i * 1000000LL; |
| }, |
| .expected_json = |
| R"([{"a": 45045123456}, {"a": 45046123456}, {"a": 45047123456}])", |
| }, |
| { |
| .name = "Timestamp", |
| .projected_type = iceberg::timestamp(), |
| .source_type = iceberg::timestamp(), |
| .value_setter = |
| [](::avro::GenericDatum& datum, int i) { |
| datum.value<::avro::GenericRecord>().fieldAt(0).value<int64_t>() = |
| i * 1000000LL; |
| }, |
| .expected_json = R"([{"a": 0}, {"a": 1000000}, {"a": 2000000}])", |
| }, |
| { |
| .name = "TimestampTz", |
| .projected_type = std::make_shared<TimestampTzType>(), |
| .source_type = std::make_shared<TimestampTzType>(), |
| .value_setter = |
| [](::avro::GenericDatum& datum, int i) { |
| datum.value<::avro::GenericRecord>().fieldAt(0).value<int64_t>() = |
| 1672531200000000LL + i * 1000000LL; |
| }, |
| .expected_json = |
| R"([{"a": 1672531200000000}, {"a": 1672531201000000}, {"a": 1672531202000000}])", |
| }, |
| { |
| .name = "IntToLongPromotion", |
| .projected_type = iceberg::int64(), |
| .source_type = iceberg::int32(), |
| .value_setter = |
| [](::avro::GenericDatum& datum, int i) { |
| datum.value<::avro::GenericRecord>().fieldAt(0).value<int32_t>() = i * 100; |
| }, |
| .expected_json = R"([{"a": 0}, {"a": 100}, {"a": 200}])", |
| }, |
| { |
| .name = "FloatToDoublePromotion", |
| .projected_type = iceberg::float64(), |
| .source_type = iceberg::float32(), |
| .value_setter = |
| [](::avro::GenericDatum& datum, int i) { |
| datum.value<::avro::GenericRecord>().fieldAt(0).value<float>() = i * 1.0f; |
| }, |
| .expected_json = R"([{"a": 0.0}, {"a": 1.0}, {"a": 2.0}])", |
| }, |
| { |
| .name = "DecimalPrecisionPromotion", |
| .projected_type = iceberg::decimal(10, 2), |
| .source_type = iceberg::decimal(6, 2), |
| .value_setter = |
| [](::avro::GenericDatum& datum, int i) { |
| int32_t decimal_value = i * 1000 + i; |
| std::vector<uint8_t>& fixed = datum.value<::avro::GenericRecord>() |
| .fieldAt(0) |
| .value<::avro::GenericFixed>() |
| .value(); |
| for (uint8_t& rvalue : std::ranges::reverse_view(fixed)) { |
| rvalue = static_cast<uint8_t>(decimal_value & 0xFF); |
| decimal_value >>= 8; |
| } |
| }, |
| .expected_json = R"([{"a": "0.00"}, {"a": "10.01"}, {"a": "20.02"}])", |
| }, |
| }; |
| |
| INSTANTIATE_TEST_SUITE_P(AllPrimitiveTypes, AppendDatumToBuilderTest, |
| ::testing::ValuesIn(kPrimitiveTestCases), |
| [](const ::testing::TestParamInfo<AppendDatumParam>& info) { |
| return info.param.name; |
| }); |
| |
| TEST(AppendDatumToBuilderTest, StructWithTwoFields) { |
| Schema iceberg_schema({ |
| SchemaField::MakeRequired(1, "id", iceberg::int32()), |
| SchemaField::MakeRequired(2, "name", iceberg::string()), |
| }); |
| ::avro::NodePtr avro_node; |
| ASSERT_THAT(ToAvroNodeVisitor{}.Visit(iceberg_schema, &avro_node), IsOk()); |
| |
| std::vector<::avro::GenericDatum> avro_data; |
| ::avro::GenericDatum avro_datum(avro_node); |
| auto& record = avro_datum.value<::avro::GenericRecord>(); |
| record.fieldAt(0).value<int32_t>() = 42; |
| record.fieldAt(1).value<std::string>() = "test"; |
| avro_data.push_back(avro_datum); |
| |
| ASSERT_NO_FATAL_FAILURE(VerifyAppendDatumToBuilder(iceberg_schema, avro_node, avro_data, |
| R"([{"id": 42, "name": "test"}])")); |
| } |
| |
| TEST(AppendDatumToBuilderTest, NestedStruct) { |
| Schema iceberg_schema({ |
| SchemaField::MakeRequired(1, "id", iceberg::int32()), |
| SchemaField::MakeRequired( |
| 2, "person", |
| std::make_shared<StructType>(std::vector<SchemaField>{ |
| SchemaField::MakeRequired(3, "name", iceberg::string()), |
| SchemaField::MakeRequired(4, "age", iceberg::int32()), |
| })), |
| }); |
| |
| ::avro::NodePtr avro_node; |
| ASSERT_THAT(ToAvroNodeVisitor{}.Visit(iceberg_schema, &avro_node), IsOk()); |
| |
| std::vector<::avro::GenericDatum> avro_data; |
| for (int i = 0; i < 2; ++i) { |
| ::avro::GenericDatum avro_datum(avro_node); |
| auto& record = avro_datum.value<::avro::GenericRecord>(); |
| |
| // Set id field |
| record.fieldAt(0).value<int32_t>() = i + 1; |
| |
| // Set nested person struct |
| auto& person_record = record.fieldAt(1).value<::avro::GenericRecord>(); |
| person_record.fieldAt(0).value<std::string>() = "Person" + std::to_string(i); |
| person_record.fieldAt(1).value<int32_t>() = 25 + i; |
| |
| avro_data.push_back(avro_datum); |
| } |
| |
| const std::string expected_json = R"([ |
| {"id": 1, "person": {"name": "Person0", "age": 25}}, |
| {"id": 2, "person": {"name": "Person1", "age": 26}} |
| ])"; |
| ASSERT_NO_FATAL_FAILURE( |
| VerifyAppendDatumToBuilder(iceberg_schema, avro_node, avro_data, expected_json)); |
| } |
| |
| TEST(AppendDatumToBuilderTest, ListOfIntegers) { |
| Schema iceberg_schema({ |
| SchemaField::MakeRequired(1, "numbers", |
| std::make_shared<ListType>(SchemaField::MakeRequired( |
| 2, "element", iceberg::int32()))), |
| }); |
| |
| ::avro::NodePtr avro_node; |
| ASSERT_THAT(ToAvroNodeVisitor{}.Visit(iceberg_schema, &avro_node), IsOk()); |
| |
| std::vector<::avro::GenericDatum> avro_data; |
| for (int i = 0; i < 2; ++i) { |
| ::avro::GenericDatum avro_datum(avro_node); |
| auto& record = avro_datum.value<::avro::GenericRecord>(); |
| |
| // Create array with values [i*10, i*10+1, i*10+2] |
| auto& array = record.fieldAt(0).value<::avro::GenericArray>(); |
| for (int j = 0; j < 3; ++j) { |
| ::avro::GenericDatum element(avro_node->leafAt(0)->leafAt(0)); |
| element.value<int32_t>() = i * 10 + j; |
| array.value().push_back(element); |
| } |
| |
| avro_data.push_back(avro_datum); |
| } |
| |
| const std::string expected_json = R"([ |
| {"numbers": [0, 1, 2]}, |
| {"numbers": [10, 11, 12]} |
| ])"; |
| ASSERT_NO_FATAL_FAILURE( |
| VerifyAppendDatumToBuilder(iceberg_schema, avro_node, avro_data, expected_json)); |
| } |
| |
| TEST(AppendDatumToBuilderTest, ListOfStructs) { |
| Schema iceberg_schema({ |
| SchemaField::MakeRequired( |
| 1, "people", |
| std::make_shared<ListType>(SchemaField::MakeRequired( |
| 2, "element", |
| std::make_shared<StructType>(std::vector<SchemaField>{ |
| SchemaField::MakeRequired(3, "name", iceberg::string()), |
| SchemaField::MakeRequired(4, "age", iceberg::int32()), |
| })))), |
| }); |
| |
| ::avro::NodePtr avro_node; |
| ASSERT_THAT(ToAvroNodeVisitor{}.Visit(iceberg_schema, &avro_node), IsOk()); |
| |
| std::vector<::avro::GenericDatum> avro_data; |
| for (int i = 0; i < 2; ++i) { |
| ::avro::GenericDatum avro_datum(avro_node); |
| auto& record = avro_datum.value<::avro::GenericRecord>(); |
| |
| auto& array = record.fieldAt(0).value<::avro::GenericArray>(); |
| for (int j = 0; j < 2; ++j) { |
| ::avro::GenericDatum element(avro_node->leafAt(0)->leafAt(0)); |
| auto& person_record = element.value<::avro::GenericRecord>(); |
| person_record.fieldAt(0).value<std::string>() = |
| "Person" + std::to_string(i) + "_" + std::to_string(j); |
| person_record.fieldAt(1).value<int32_t>() = 20 + i * 10 + j; |
| array.value().push_back(element); |
| } |
| |
| avro_data.push_back(avro_datum); |
| } |
| |
| const std::string expected_json = R"([ |
| {"people": [ |
| {"name": "Person0_0", "age": 20}, |
| {"name": "Person0_1", "age": 21} |
| ]}, |
| {"people": [ |
| {"name": "Person1_0", "age": 30}, |
| {"name": "Person1_1", "age": 31} |
| ]} |
| ])"; |
| ASSERT_NO_FATAL_FAILURE( |
| VerifyAppendDatumToBuilder(iceberg_schema, avro_node, avro_data, expected_json)); |
| } |
| |
| TEST(AppendDatumToBuilderTest, MapStringToInt) { |
| Schema iceberg_schema({ |
| SchemaField::MakeRequired( |
| 1, "scores", |
| std::make_shared<MapType>( |
| SchemaField::MakeRequired(2, "key", iceberg::string()), |
| SchemaField::MakeRequired(3, "value", iceberg::int32()))), |
| }); |
| |
| ::avro::NodePtr avro_node; |
| ASSERT_THAT(ToAvroNodeVisitor{}.Visit(iceberg_schema, &avro_node), IsOk()); |
| |
| std::vector<::avro::GenericDatum> avro_data; |
| for (int i = 0; i < 2; ++i) { |
| ::avro::GenericDatum avro_datum(avro_node); |
| auto& record = avro_datum.value<::avro::GenericRecord>(); |
| |
| auto& map = record.fieldAt(0).value<::avro::GenericMap>(); |
| auto& map_container = map.value(); |
| |
| map_container.emplace_back("score_" + std::to_string(i * 2), |
| ::avro::GenericDatum(static_cast<int32_t>(100 + i * 10))); |
| map_container.emplace_back( |
| "score_" + std::to_string(i * 2 + 1), |
| ::avro::GenericDatum(static_cast<int32_t>(100 + i * 10 + 5))); |
| |
| avro_data.push_back(avro_datum); |
| } |
| |
| const std::string expected_json = R"([ |
| {"scores": [["score_0", 100], ["score_1", 105]]}, |
| {"scores": [["score_2", 110], ["score_3", 115]]} |
| ])"; |
| ASSERT_NO_FATAL_FAILURE( |
| VerifyAppendDatumToBuilder(iceberg_schema, avro_node, avro_data, expected_json)); |
| } |
| |
| TEST(AppendDatumToBuilderTest, MapIntToStringAsArray) { |
| Schema iceberg_schema({ |
| SchemaField::MakeRequired( |
| 1, "names", |
| std::make_shared<MapType>( |
| SchemaField::MakeRequired(2, "key", iceberg::int32()), |
| SchemaField::MakeRequired(3, "value", iceberg::string()))), |
| }); |
| |
| ::avro::NodePtr avro_node; |
| ASSERT_THAT(ToAvroNodeVisitor{}.Visit(iceberg_schema, &avro_node), IsOk()); |
| |
| std::vector<::avro::GenericDatum> avro_data; |
| for (int i = 0; i < 2; ++i) { |
| ::avro::GenericDatum avro_datum(avro_node); |
| auto& record = avro_datum.value<::avro::GenericRecord>(); |
| |
| auto& array = record.fieldAt(0).value<::avro::GenericArray>(); |
| for (int j = 0; j < 2; ++j) { |
| ::avro::GenericDatum kv_pair(avro_node->leafAt(0)->leafAt(0)); |
| auto& kv_record = kv_pair.value<::avro::GenericRecord>(); |
| kv_record.fieldAt(0).value<int32_t>() = i * 10 + j; |
| kv_record.fieldAt(1).value<std::string>() = "name_" + std::to_string(i * 10 + j); |
| array.value().push_back(kv_pair); |
| } |
| |
| avro_data.push_back(avro_datum); |
| } |
| |
| const std::string expected_json = R"([ |
| {"names": [[0, "name_0"], [1, "name_1"]]}, |
| {"names": [[10, "name_10"], [11, "name_11"]]} |
| ])"; |
| ASSERT_NO_FATAL_FAILURE( |
| VerifyAppendDatumToBuilder(iceberg_schema, avro_node, avro_data, expected_json)); |
| } |
| |
| TEST(AppendDatumToBuilderTest, MapStringToStruct) { |
| Schema iceberg_schema({ |
| SchemaField::MakeRequired( |
| 1, "users", |
| std::make_shared<MapType>( |
| SchemaField::MakeRequired(2, "key", iceberg::string()), |
| SchemaField::MakeRequired( |
| 3, "value", |
| std::make_shared<StructType>(std::vector<SchemaField>{ |
| SchemaField::MakeRequired(4, "id", iceberg::int32()), |
| SchemaField::MakeRequired(5, "email", iceberg::string()), |
| })))), |
| }); |
| |
| ::avro::NodePtr avro_node; |
| ASSERT_THAT(ToAvroNodeVisitor{}.Visit(iceberg_schema, &avro_node), IsOk()); |
| |
| std::vector<::avro::GenericDatum> avro_data; |
| for (int i = 0; i < 2; ++i) { |
| ::avro::GenericDatum avro_datum(avro_node); |
| auto& record = avro_datum.value<::avro::GenericRecord>(); |
| |
| auto& map = record.fieldAt(0).value<::avro::GenericMap>(); |
| auto& map_container = map.value(); |
| |
| ::avro::GenericDatum struct_value(avro_node->leafAt(0)->leafAt(1)); |
| auto& struct_record = struct_value.value<::avro::GenericRecord>(); |
| struct_record.fieldAt(0).value<int32_t>() = 1000 + i; |
| struct_record.fieldAt(1).value<std::string>() = |
| "user" + std::to_string(i) + "@example.com"; |
| |
| map_container.emplace_back("user_" + std::to_string(i), std::move(struct_value)); |
| |
| avro_data.push_back(avro_datum); |
| } |
| |
| const std::string expected_json = R"([ |
| {"users": [["user_0", {"id": 1000, "email": "user0@example.com"}]]}, |
| {"users": [["user_1", {"id": 1001, "email": "user1@example.com"}]]} |
| ])"; |
| ASSERT_NO_FATAL_FAILURE( |
| VerifyAppendDatumToBuilder(iceberg_schema, avro_node, avro_data, expected_json)); |
| } |
| |
| TEST(AppendDatumToBuilderTest, StructWithMissingOptionalField) { |
| Schema iceberg_schema({ |
| SchemaField::MakeRequired(1, "id", iceberg::int32()), |
| SchemaField::MakeRequired(2, "name", iceberg::string()), |
| SchemaField::MakeOptional(3, "age", |
| iceberg::int32()), // Missing in Avro |
| SchemaField::MakeOptional(4, "email", |
| iceberg::string()), // Missing in Avro |
| }); |
| |
| // Create Avro schema that only has id and name fields (missing age and email) |
| std::string avro_schema_json = R"({ |
| "type": "record", |
| "name": "person", |
| "fields": [ |
| {"name": "id", "type": "int", "field-id": 1}, |
| {"name": "name", "type": "string", "field-id": 2} |
| ] |
| })"; |
| auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json); |
| |
| std::vector<::avro::GenericDatum> avro_data; |
| for (int i = 0; i < 2; ++i) { |
| ::avro::GenericDatum avro_datum(avro_schema.root()); |
| auto& record = avro_datum.value<::avro::GenericRecord>(); |
| record.fieldAt(0).value<int32_t>() = i + 1; |
| record.fieldAt(1).value<std::string>() = "Person" + std::to_string(i); |
| avro_data.push_back(avro_datum); |
| } |
| |
| const std::string expected_json = R"([ |
| {"id": 1, "name": "Person0", "age": null, "email": null}, |
| {"id": 2, "name": "Person1", "age": null, "email": null} |
| ])"; |
| ASSERT_NO_FATAL_FAILURE(VerifyAppendDatumToBuilder(iceberg_schema, avro_schema.root(), |
| avro_data, expected_json)); |
| } |
| |
| TEST(AppendDatumToBuilderTest, NestedStructWithMissingOptionalFields) { |
| Schema iceberg_schema({ |
| SchemaField::MakeRequired(1, "id", iceberg::int32()), |
| SchemaField::MakeRequired( |
| 2, "person", |
| std::make_shared<StructType>(std::vector<SchemaField>{ |
| SchemaField::MakeRequired(3, "name", iceberg::string()), |
| SchemaField::MakeOptional(4, "age", |
| iceberg::int32()), // Missing |
| SchemaField::MakeOptional(5, "phone", |
| iceberg::string()), // Missing |
| })), |
| SchemaField::MakeOptional(6, "department", |
| iceberg::string()), // Missing |
| }); |
| |
| // Create Avro schema with only id, person.name fields |
| std::string avro_schema_json = R"({ |
| "type": "record", |
| "name": "employee", |
| "fields": [ |
| {"name": "id", "type": "int", "field-id": 1}, |
| {"name": "person", "type": { |
| "type": "record", |
| "name": "person_info", |
| "fields": [ |
| {"name": "name", "type": "string", "field-id": 3} |
| ] |
| }, "field-id": 2} |
| ] |
| })"; |
| auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json); |
| |
| std::vector<::avro::GenericDatum> avro_data; |
| for (int i = 0; i < 2; ++i) { |
| ::avro::GenericDatum avro_datum(avro_schema.root()); |
| auto& record = avro_datum.value<::avro::GenericRecord>(); |
| |
| record.fieldAt(0).value<int32_t>() = i + 100; |
| |
| auto& person_record = record.fieldAt(1).value<::avro::GenericRecord>(); |
| person_record.fieldAt(0).value<std::string>() = "Employee" + std::to_string(i); |
| |
| avro_data.push_back(avro_datum); |
| } |
| |
| const std::string expected_json = R"([ |
| {"id": 100, "person": {"name": "Employee0", "age": null, "phone": null}, "department": null}, |
| {"id": 101, "person": {"name": "Employee1", "age": null, "phone": null}, "department": null} |
| ])"; |
| ASSERT_NO_FATAL_FAILURE(VerifyAppendDatumToBuilder(iceberg_schema, avro_schema.root(), |
| avro_data, expected_json)); |
| } |
| |
| TEST(AppendDatumToBuilderTest, ListWithMissingOptionalElementFields) { |
| Schema iceberg_schema({ |
| SchemaField::MakeRequired( |
| 1, "people", |
| std::make_shared<ListType>(SchemaField::MakeRequired( |
| 2, "element", |
| std::make_shared<StructType>(std::vector<SchemaField>{ |
| SchemaField::MakeRequired(3, "name", iceberg::string()), |
| SchemaField::MakeOptional(4, "age", |
| iceberg::int32()), // Missing in Avro |
| SchemaField::MakeOptional(5, "email", |
| iceberg::string()), // Missing in Avro |
| })))), |
| }); |
| |
| // Create Avro schema with list of structs that only have name field |
| std::string avro_schema_json = R"({ |
| "type": "record", |
| "name": "people_list", |
| "fields": [ |
| {"name": "people", "type": { |
| "type": "array", |
| "items": { |
| "type": "record", |
| "name": "person", |
| "fields": [ |
| {"name": "name", "type": "string", "field-id": 3} |
| ] |
| }, |
| "element-id": 2 |
| }, "field-id": 1} |
| ] |
| })"; |
| auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json); |
| |
| std::vector<::avro::GenericDatum> avro_data; |
| for (int i = 0; i < 2; ++i) { |
| ::avro::GenericDatum avro_datum(avro_schema.root()); |
| auto& record = avro_datum.value<::avro::GenericRecord>(); |
| |
| auto& array = record.fieldAt(0).value<::avro::GenericArray>(); |
| for (int j = 0; j < 2; ++j) { |
| ::avro::GenericDatum element(avro_schema.root()->leafAt(0)->leafAt(0)); |
| auto& person_record = element.value<::avro::GenericRecord>(); |
| person_record.fieldAt(0).value<std::string>() = |
| "Person" + std::to_string(i) + "_" + std::to_string(j); |
| array.value().push_back(element); |
| } |
| |
| avro_data.push_back(avro_datum); |
| } |
| |
| const std::string expected_json = R"([ |
| {"people": [ |
| {"name": "Person0_0", "age": null, "email": null}, |
| {"name": "Person0_1", "age": null, "email": null} |
| ]}, |
| {"people": [ |
| {"name": "Person1_0", "age": null, "email": null}, |
| {"name": "Person1_1", "age": null, "email": null} |
| ]} |
| ])"; |
| ASSERT_NO_FATAL_FAILURE(VerifyAppendDatumToBuilder(iceberg_schema, avro_schema.root(), |
| avro_data, expected_json)); |
| } |
| |
| struct ExtractDatumParam { |
| std::string name; |
| std::shared_ptr<Type> iceberg_type; |
| std::string arrow_json; |
| std::function<void(const ::avro::GenericDatum&, int)> value_verifier; |
| }; |
| |
| void VerifyExtractDatumFromArray(const ExtractDatumParam& test_case) { |
| Schema iceberg_schema({SchemaField::MakeRequired( |
| /*field_id=*/1, /*name=*/"a", test_case.iceberg_type)}); |
| ::avro::NodePtr avro_node; |
| ASSERT_THAT(ToAvroNodeVisitor{}.Visit(iceberg_schema, &avro_node), IsOk()); |
| |
| ArrowSchema arrow_c_schema; |
| ASSERT_THAT(ToArrowSchema(iceberg_schema, &arrow_c_schema), IsOk()); |
| auto arrow_schema = ::arrow::ImportSchema(&arrow_c_schema).ValueOrDie(); |
| auto arrow_struct_type = std::make_shared<::arrow::StructType>(arrow_schema->fields()); |
| auto arrow_array = |
| ::arrow::json::ArrayFromJSONString(arrow_struct_type, test_case.arrow_json) |
| .ValueOrDie(); |
| |
| for (int64_t i = 0; i < arrow_array->length(); ++i) { |
| ::avro::GenericDatum extracted_datum(avro_node); |
| ASSERT_THAT(ExtractDatumFromArray(*arrow_array, i, &extracted_datum), IsOk()) |
| << "Failed to extract at index " << i; |
| test_case.value_verifier(extracted_datum, static_cast<int>(i)); |
| } |
| } |
| |
| class ExtractDatumFromArrayTest : public ::testing::TestWithParam<ExtractDatumParam> {}; |
| |
| TEST_P(ExtractDatumFromArrayTest, PrimitiveType) { |
| ASSERT_NO_FATAL_FAILURE(VerifyExtractDatumFromArray(GetParam())); |
| } |
| |
| const std::vector<ExtractDatumParam> kExtractDatumTestCases = { |
| { |
| .name = "Boolean", |
| .iceberg_type = boolean(), |
| .arrow_json = R"([{"a": true}, {"a": false}, {"a": true}])", |
| .value_verifier = |
| [](const ::avro::GenericDatum& datum, int i) { |
| const auto& record = datum.value<::avro::GenericRecord>(); |
| bool expected = (i % 2 == 0); |
| EXPECT_EQ(record.fieldAt(0).value<bool>(), expected); |
| }, |
| }, |
| { |
| .name = "Int", |
| .iceberg_type = int32(), |
| .arrow_json = R"([{"a": 0}, {"a": 100}, {"a": 200}])", |
| .value_verifier = |
| [](const ::avro::GenericDatum& datum, int i) { |
| const auto& record = datum.value<::avro::GenericRecord>(); |
| EXPECT_EQ(record.fieldAt(0).value<int32_t>(), i * 100); |
| }, |
| }, |
| { |
| .name = "Long", |
| .iceberg_type = int64(), |
| .arrow_json = R"([{"a": 0}, {"a": 1000000}, {"a": 2000000}])", |
| .value_verifier = |
| [](const ::avro::GenericDatum& datum, int i) { |
| const auto& record = datum.value<::avro::GenericRecord>(); |
| EXPECT_EQ(record.fieldAt(0).value<int64_t>(), i * 1000000LL); |
| }, |
| }, |
| { |
| .name = "Float", |
| .iceberg_type = float32(), |
| .arrow_json = R"([{"a": 0.0}, {"a": 3.14}, {"a": 6.28}])", |
| .value_verifier = |
| [](const ::avro::GenericDatum& datum, int i) { |
| const auto& record = datum.value<::avro::GenericRecord>(); |
| EXPECT_FLOAT_EQ(record.fieldAt(0).value<float>(), i * 3.14f); |
| }, |
| }, |
| { |
| .name = "Double", |
| .iceberg_type = float64(), |
| .arrow_json = R"([{"a": 0.0}, {"a": 1.234567890}, {"a": 2.469135780}])", |
| .value_verifier = |
| [](const ::avro::GenericDatum& datum, int i) { |
| const auto& record = datum.value<::avro::GenericRecord>(); |
| EXPECT_DOUBLE_EQ(record.fieldAt(0).value<double>(), i * 1.234567890); |
| }, |
| }, |
| { |
| .name = "String", |
| .iceberg_type = string(), |
| .arrow_json = |
| R"([{"a": "test_string_0"}, {"a": "test_string_1"}, {"a": "test_string_2"}])", |
| .value_verifier = |
| [](const ::avro::GenericDatum& datum, int i) { |
| const auto& record = datum.value<::avro::GenericRecord>(); |
| std::string expected = "test_string_" + std::to_string(i); |
| EXPECT_EQ(record.fieldAt(0).value<std::string>(), expected); |
| }, |
| }, |
| { |
| .name = "Binary", |
| .iceberg_type = binary(), |
| .arrow_json = R"([{"a": "abc"}, {"a": "bcd"}, {"a": "cde"}])", |
| .value_verifier = |
| [](const ::avro::GenericDatum& datum, int i) { |
| const auto& record = datum.value<::avro::GenericRecord>(); |
| const auto& bytes = record.fieldAt(0).value<std::vector<uint8_t>>(); |
| EXPECT_EQ(bytes.size(), 3); |
| EXPECT_EQ(bytes[0], static_cast<uint8_t>('a' + i)); |
| EXPECT_EQ(bytes[1], static_cast<uint8_t>('b' + i)); |
| EXPECT_EQ(bytes[2], static_cast<uint8_t>('c' + i)); |
| }, |
| }, |
| { |
| .name = "Fixed", |
| .iceberg_type = fixed(4), |
| .arrow_json = R"([{"a": "abcd"}, {"a": "bcde"}, {"a": "cdef"}])", |
| .value_verifier = |
| [](const ::avro::GenericDatum& datum, int i) { |
| const auto& record = datum.value<::avro::GenericRecord>(); |
| const auto& fixed = record.fieldAt(0).value<::avro::GenericFixed>(); |
| EXPECT_EQ(fixed.value().size(), 4); |
| EXPECT_EQ(static_cast<char>(fixed.value()[0]), static_cast<char>('a' + i)); |
| EXPECT_EQ(static_cast<char>(fixed.value()[1]), static_cast<char>('b' + i)); |
| EXPECT_EQ(static_cast<char>(fixed.value()[2]), static_cast<char>('c' + i)); |
| EXPECT_EQ(static_cast<char>(fixed.value()[3]), static_cast<char>('d' + i)); |
| }, |
| }, |
| { |
| .name = "Decimal", |
| .iceberg_type = decimal(10, 2), |
| .arrow_json = R"([{"a": "0.00"}, {"a": "10.01"}, {"a": "20.02"}])", |
| .value_verifier = |
| [](const ::avro::GenericDatum& datum, int i) { |
| const auto& record = datum.value<::avro::GenericRecord>(); |
| const auto& fixed = record.fieldAt(0).value<::avro::GenericFixed>(); |
| |
| const auto& bytes = fixed.value(); |
| auto decimal = |
| ::arrow::Decimal128::FromBigEndian( |
| reinterpret_cast<const uint8_t*>(bytes.data()), bytes.size()) |
| .ValueOrDie(); |
| int64_t expected_unscaled = i * 1000 + i; |
| EXPECT_EQ(decimal.low_bits(), static_cast<uint64_t>(expected_unscaled)); |
| EXPECT_EQ(decimal.high_bits(), 0); |
| }, |
| }, |
| { |
| .name = "Date", |
| .iceberg_type = date(), |
| .arrow_json = R"([{"a": 18000}, {"a": 18001}, {"a": 18002}])", |
| .value_verifier = |
| [](const ::avro::GenericDatum& datum, int i) { |
| const auto& record = datum.value<::avro::GenericRecord>(); |
| EXPECT_EQ(record.fieldAt(0).value<int32_t>(), 18000 + i); |
| }, |
| }, |
| { |
| .name = "Time", |
| .iceberg_type = time(), |
| .arrow_json = R"([{"a": 45045123456}, {"a": 45046123456}, {"a": 45047123456}])", |
| .value_verifier = |
| [](const ::avro::GenericDatum& datum, int i) { |
| const auto& record = datum.value<::avro::GenericRecord>(); |
| EXPECT_EQ(record.fieldAt(0).value<int64_t>(), |
| 45045123456LL + i * 1000000LL); |
| }, |
| }, |
| { |
| .name = "Timestamp", |
| .iceberg_type = timestamp(), |
| .arrow_json = R"([{"a": 0}, {"a": 1000000}, {"a": 2000000}])", |
| .value_verifier = |
| [](const ::avro::GenericDatum& datum, int i) { |
| const auto& record = datum.value<::avro::GenericRecord>(); |
| EXPECT_EQ(record.fieldAt(0).value<int64_t>(), i * 1000000LL); |
| }, |
| }, |
| { |
| .name = "TimestampTz", |
| .iceberg_type = timestamp_tz(), |
| .arrow_json = |
| R"([{"a": 1672531200000000}, {"a": 1672531201000000}, {"a": 1672531202000000}])", |
| .value_verifier = |
| [](const ::avro::GenericDatum& datum, int i) { |
| const auto& record = datum.value<::avro::GenericRecord>(); |
| EXPECT_EQ(record.fieldAt(0).value<int64_t>(), |
| 1672531200000000LL + i * 1000000LL); |
| }, |
| }, |
| }; |
| |
| INSTANTIATE_TEST_SUITE_P(AllPrimitiveTypes, ExtractDatumFromArrayTest, |
| ::testing::ValuesIn(kExtractDatumTestCases), |
| [](const ::testing::TestParamInfo<ExtractDatumParam>& info) { |
| return info.param.name; |
| }); |
| |
| TEST(ExtractDatumFromArrayTest, StructWithTwoFields) { |
| Schema iceberg_schema({ |
| SchemaField::MakeRequired(1, "id", int32()), |
| SchemaField::MakeRequired(2, "name", string()), |
| }); |
| ::avro::NodePtr avro_node; |
| ASSERT_THAT(ToAvroNodeVisitor{}.Visit(iceberg_schema, &avro_node), IsOk()); |
| |
| ArrowSchema arrow_c_schema; |
| ASSERT_THAT(ToArrowSchema(iceberg_schema, &arrow_c_schema), IsOk()); |
| auto arrow_schema = ::arrow::ImportSchema(&arrow_c_schema).ValueOrDie(); |
| auto arrow_struct_type = std::make_shared<::arrow::StructType>(arrow_schema->fields()); |
| |
| auto arrow_array = ::arrow::json::ArrayFromJSONString(arrow_struct_type, |
| R"([ |
| {"id": 42, "name": "Alice"}, |
| {"id": 43, "name": "Bob"}, |
| {"id": 44, "name": "Charlie"} |
| ])") |
| .ValueOrDie(); |
| |
| struct ExpectedData { |
| int32_t id; |
| std::string name; |
| }; |
| std::vector<ExpectedData> expected = {{.id = 42, .name = "Alice"}, |
| {.id = 43, .name = "Bob"}, |
| {.id = 44, .name = "Charlie"}}; |
| |
| auto verify_record = [&](int64_t index, const ExpectedData& expected_data) { |
| ::avro::GenericDatum extracted_datum(avro_node); |
| ASSERT_THAT(ExtractDatumFromArray(*arrow_array, index, &extracted_datum), IsOk()); |
| const auto& record = extracted_datum.value<::avro::GenericRecord>(); |
| EXPECT_EQ(record.fieldAt(0).value<int32_t>(), expected_data.id); |
| EXPECT_EQ(record.fieldAt(1).value<std::string>(), expected_data.name); |
| }; |
| |
| for (size_t i = 0; i < expected.size(); ++i) { |
| verify_record(i, expected[i]); |
| } |
| } |
| |
| TEST(ExtractDatumFromArrayTest, NestedStruct) { |
| Schema iceberg_schema({ |
| SchemaField::MakeRequired(1, "id", int32()), |
| SchemaField::MakeRequired(2, "person", |
| std::make_shared<StructType>(std::vector<SchemaField>{ |
| SchemaField::MakeRequired(3, "name", string()), |
| SchemaField::MakeRequired(4, "age", int32()), |
| })), |
| }); |
| |
| ::avro::NodePtr avro_node; |
| ASSERT_THAT(ToAvroNodeVisitor{}.Visit(iceberg_schema, &avro_node), IsOk()); |
| |
| ArrowSchema arrow_c_schema; |
| ASSERT_THAT(ToArrowSchema(iceberg_schema, &arrow_c_schema), IsOk()); |
| auto arrow_schema = ::arrow::ImportSchema(&arrow_c_schema).ValueOrDie(); |
| auto arrow_struct_type = std::make_shared<::arrow::StructType>(arrow_schema->fields()); |
| |
| const std::string arrow_json = R"([ |
| {"id": 1, "person": {"name": "Alice", "age": 25}}, |
| {"id": 2, "person": {"name": "Bob", "age": 30}}, |
| {"id": 3, "person": {"name": "Charlie", "age": 35}} |
| ])"; |
| auto arrow_array = |
| ::arrow::json::ArrayFromJSONString(arrow_struct_type, arrow_json).ValueOrDie(); |
| |
| struct ExpectedData { |
| int32_t id; |
| std::string name; |
| int32_t age; |
| }; |
| std::vector<ExpectedData> expected = {{.id = 1, .name = "Alice", .age = 25}, |
| {.id = 2, .name = "Bob", .age = 30}, |
| {.id = 3, .name = "Charlie", .age = 35}}; |
| |
| auto verify_record = [&](int64_t index, const ExpectedData& expected_data) { |
| ::avro::GenericDatum extracted_datum(avro_node); |
| ASSERT_THAT(ExtractDatumFromArray(*arrow_array, index, &extracted_datum), IsOk()); |
| const auto& record = extracted_datum.value<::avro::GenericRecord>(); |
| EXPECT_EQ(record.fieldAt(0).value<int32_t>(), expected_data.id); |
| const auto& person_record = record.fieldAt(1).value<::avro::GenericRecord>(); |
| EXPECT_EQ(person_record.fieldAt(0).value<std::string>(), expected_data.name); |
| EXPECT_EQ(person_record.fieldAt(1).value<int32_t>(), expected_data.age); |
| }; |
| |
| for (size_t i = 0; i < expected.size(); ++i) { |
| verify_record(i, expected[i]); |
| } |
| } |
| |
| TEST(ExtractDatumFromArrayTest, ListOfIntegers) { |
| Schema iceberg_schema({ |
| SchemaField::MakeRequired( |
| 1, "numbers", |
| std::make_shared<ListType>(SchemaField::MakeRequired(2, "element", int32()))), |
| }); |
| |
| ::avro::NodePtr avro_node; |
| ASSERT_THAT(ToAvroNodeVisitor{}.Visit(iceberg_schema, &avro_node), IsOk()); |
| |
| ArrowSchema arrow_c_schema; |
| ASSERT_THAT(ToArrowSchema(iceberg_schema, &arrow_c_schema), IsOk()); |
| auto arrow_schema = ::arrow::ImportSchema(&arrow_c_schema).ValueOrDie(); |
| auto arrow_struct_type = std::make_shared<::arrow::StructType>(arrow_schema->fields()); |
| |
| const std::string arrow_json = R"([ |
| {"numbers": [10, 11, 12]}, |
| {"numbers": [20, 21]}, |
| {"numbers": [30, 31, 32, 33]} |
| ])"; |
| auto arrow_array = |
| ::arrow::json::ArrayFromJSONString(arrow_struct_type, arrow_json).ValueOrDie(); |
| |
| std::vector<std::vector<int32_t>> expected = {{10, 11, 12}, {20, 21}, {30, 31, 32, 33}}; |
| |
| auto verify_record = [&](int64_t index, const std::vector<int32_t>& expected_numbers) { |
| ::avro::GenericDatum extracted_datum(avro_node); |
| ASSERT_THAT(ExtractDatumFromArray(*arrow_array, index, &extracted_datum), IsOk()); |
| const auto& record = extracted_datum.value<::avro::GenericRecord>(); |
| const auto& array = record.fieldAt(0).value<::avro::GenericArray>(); |
| const auto& elements = array.value(); |
| |
| ASSERT_EQ(elements.size(), expected_numbers.size()); |
| for (size_t i = 0; i < expected_numbers.size(); ++i) { |
| EXPECT_EQ(elements[i].value<int32_t>(), expected_numbers[i]); |
| } |
| }; |
| |
| for (size_t i = 0; i < expected.size(); ++i) { |
| verify_record(i, expected[i]); |
| } |
| } |
| |
| TEST(ExtractDatumFromArrayTest, MapStringToInt) { |
| Schema iceberg_schema({ |
| SchemaField::MakeRequired( |
| 1, "scores", |
| std::make_shared<MapType>(SchemaField::MakeRequired(2, "key", string()), |
| SchemaField::MakeRequired(3, "value", int32()))), |
| }); |
| |
| ::avro::NodePtr avro_node; |
| ASSERT_THAT(ToAvroNodeVisitor{}.Visit(iceberg_schema, &avro_node), IsOk()); |
| |
| ArrowSchema arrow_c_schema; |
| ASSERT_THAT(ToArrowSchema(iceberg_schema, &arrow_c_schema), IsOk()); |
| auto arrow_schema = ::arrow::ImportSchema(&arrow_c_schema).ValueOrDie(); |
| auto arrow_struct_type = std::make_shared<::arrow::StructType>(arrow_schema->fields()); |
| |
| const std::string arrow_json = R"([ |
| {"scores": [["alice", 95], ["bob", 87]]}, |
| {"scores": [["charlie", 92], ["diana", 98], ["eve", 89]]}, |
| {"scores": [["frank", 91]]} |
| ])"; |
| auto arrow_array = |
| ::arrow::json::ArrayFromJSONString(arrow_struct_type, arrow_json).ValueOrDie(); |
| |
| using MapEntry = std::pair<std::string, int32_t>; |
| std::vector<std::vector<MapEntry>> expected = { |
| {{"alice", 95}, {"bob", 87}}, |
| {{"charlie", 92}, {"diana", 98}, {"eve", 89}}, |
| {{"frank", 91}}}; |
| |
| auto verify_record = [&](int64_t index, const std::vector<MapEntry>& expected_entries) { |
| ::avro::GenericDatum extracted_datum(avro_node); |
| ASSERT_THAT(ExtractDatumFromArray(*arrow_array, index, &extracted_datum), IsOk()); |
| const auto& record = extracted_datum.value<::avro::GenericRecord>(); |
| const auto& map = record.fieldAt(0).value<::avro::GenericMap>(); |
| const auto& entries = map.value(); |
| |
| ASSERT_EQ(entries.size(), expected_entries.size()); |
| for (size_t i = 0; i < expected_entries.size(); ++i) { |
| EXPECT_EQ(entries[i].first, expected_entries[i].first); |
| EXPECT_EQ(entries[i].second.value<int32_t>(), expected_entries[i].second); |
| } |
| }; |
| |
| for (size_t i = 0; i < expected.size(); ++i) { |
| verify_record(i, expected[i]); |
| } |
| } |
| |
| TEST(ExtractDatumFromArrayTest, ErrorHandling) { |
| Schema iceberg_schema({SchemaField::MakeRequired(1, "a", int32())}); |
| ::avro::NodePtr avro_node; |
| ASSERT_THAT(ToAvroNodeVisitor{}.Visit(iceberg_schema, &avro_node), IsOk()); |
| |
| ArrowSchema arrow_c_schema; |
| ASSERT_THAT(ToArrowSchema(iceberg_schema, &arrow_c_schema), IsOk()); |
| auto arrow_schema = ::arrow::ImportSchema(&arrow_c_schema).ValueOrDie(); |
| auto arrow_struct_type = std::make_shared<::arrow::StructType>(arrow_schema->fields()); |
| |
| auto arrow_array = ::arrow::json::ArrayFromJSONString( |
| arrow_struct_type, R"([{"a": 1}, {"a": 2}, {"a": 3}])") |
| .ValueOrDie(); |
| |
| ::avro::GenericDatum datum(avro_node); |
| |
| // Test negative index |
| EXPECT_THAT(ExtractDatumFromArray(*arrow_array, -1, &datum), |
| HasErrorMessage("Cannot extract datum from array at index -1")); |
| |
| // Test index beyond array length |
| EXPECT_THAT(ExtractDatumFromArray(*arrow_array, 3, &datum), |
| HasErrorMessage("Cannot extract datum from array at index 3")); |
| } |
| |
| TEST(ExtractDatumFromArrayTest, NullHandling) { |
| Schema iceberg_schema({SchemaField::MakeOptional(1, "a", int32())}); |
| ::avro::NodePtr avro_node; |
| ASSERT_THAT(ToAvroNodeVisitor{}.Visit(iceberg_schema, &avro_node), IsOk()); |
| |
| ArrowSchema arrow_c_schema; |
| ASSERT_THAT(ToArrowSchema(iceberg_schema, &arrow_c_schema), IsOk()); |
| auto arrow_schema = ::arrow::ImportSchema(&arrow_c_schema).ValueOrDie(); |
| auto arrow_struct_type = std::make_shared<::arrow::StructType>(arrow_schema->fields()); |
| |
| auto arrow_array = |
| ::arrow::json::ArrayFromJSONString(arrow_struct_type, R"([{"a": 42}, {"a": null}])") |
| .ValueOrDie(); |
| |
| ::avro::GenericDatum datum(avro_node); |
| ASSERT_THAT(ExtractDatumFromArray(*arrow_array, 0, &datum), IsOk()); |
| |
| const auto& record = datum.value<::avro::GenericRecord>(); |
| EXPECT_EQ(record.fieldAt(0).unionBranch(), 1); |
| EXPECT_EQ(record.fieldAt(0).type(), ::avro::AVRO_INT); |
| EXPECT_EQ(record.fieldAt(0).value<int32_t>(), 42); |
| |
| ASSERT_THAT(ExtractDatumFromArray(*arrow_array, 1, &datum), IsOk()); |
| const auto& record2 = datum.value<::avro::GenericRecord>(); |
| EXPECT_EQ(record2.fieldAt(0).unionBranch(), 0); |
| EXPECT_EQ(record2.fieldAt(0).type(), ::avro::AVRO_NULL); |
| } |
| |
| struct RoundTripParam { |
| std::string name; |
| std::shared_ptr<Schema> iceberg_schema; |
| std::string arrow_json; |
| }; |
| |
| void VerifyRoundTripConversion(const RoundTripParam& test_case) { |
| ::avro::NodePtr avro_node; |
| ASSERT_THAT(ToAvroNodeVisitor{}.Visit(*test_case.iceberg_schema, &avro_node), IsOk()); |
| |
| ArrowSchema arrow_c_schema; |
| ASSERT_THAT(ToArrowSchema(*test_case.iceberg_schema, &arrow_c_schema), IsOk()); |
| auto arrow_schema = ::arrow::ImportSchema(&arrow_c_schema).ValueOrDie(); |
| auto arrow_struct_type = std::make_shared<::arrow::StructType>(arrow_schema->fields()); |
| |
| auto original_array = |
| ::arrow::json::ArrayFromJSONString(arrow_struct_type, test_case.arrow_json) |
| .ValueOrDie(); |
| |
| std::vector<::avro::GenericDatum> extracted_data; |
| for (int64_t i = 0; i < original_array->length(); ++i) { |
| ::avro::GenericDatum datum(avro_node); |
| ASSERT_THAT(ExtractDatumFromArray(*original_array, i, &datum), IsOk()) |
| << "Failed to extract datum at index " << i; |
| extracted_data.push_back(datum); |
| } |
| |
| auto projection_result = |
| Project(*test_case.iceberg_schema, avro_node, /*prune_source=*/false); |
| ASSERT_THAT(projection_result, IsOk()); |
| auto projection = std::move(projection_result.value()); |
| |
| auto builder = ::arrow::MakeBuilder(arrow_struct_type).ValueOrDie(); |
| for (const auto& datum : extracted_data) { |
| ASSERT_THAT( |
| AppendDatumToBuilder(avro_node, datum, projection, *test_case.iceberg_schema, |
| /*metadata_context=*/{}, builder.get()), |
| IsOk()); |
| } |
| |
| auto rebuilt_array = builder->Finish().ValueOrDie(); |
| |
| ASSERT_TRUE(original_array->Equals(*rebuilt_array)) |
| << "Round-trip consistency failed!\n" |
| << "Original array: " << original_array->ToString() << "\n" |
| << "Rebuilt array: " << rebuilt_array->ToString(); |
| } |
| |
| class AvroRoundTripConversionTest : public ::testing::TestWithParam<RoundTripParam> {}; |
| |
| TEST_P(AvroRoundTripConversionTest, ConvertTypes) { |
| ASSERT_NO_FATAL_FAILURE(VerifyRoundTripConversion(GetParam())); |
| } |
| |
| const std::vector<RoundTripParam> kRoundTripTestCases = { |
| { |
| .name = "SimpleStruct", |
| .iceberg_schema = std::make_shared<Schema>(std::vector<SchemaField>{ |
| SchemaField::MakeRequired(1, "id", int32()), |
| SchemaField::MakeRequired(2, "name", string()), |
| SchemaField::MakeOptional(3, "age", int32()), |
| }), |
| .arrow_json = R"([ |
| {"id": 100, "name": "Alice", "age": 25}, |
| {"id": 101, "name": "Bob", "age": null}, |
| {"id": 102, "name": "Charlie", "age": 35} |
| ])", |
| }, |
| { |
| .name = "PrimitiveTypes", |
| .iceberg_schema = std::make_shared<Schema>(std::vector<SchemaField>{ |
| SchemaField::MakeRequired(1, "bool_field", boolean()), |
| SchemaField::MakeRequired(2, "int_field", int32()), |
| SchemaField::MakeRequired(3, "long_field", int64()), |
| SchemaField::MakeRequired(4, "float_field", float32()), |
| SchemaField::MakeRequired(5, "double_field", float64()), |
| SchemaField::MakeRequired(6, "string_field", string()), |
| }), |
| .arrow_json = R"([ |
| {"bool_field": true, "int_field": 42, "long_field": 1000000, "float_field": 3.14, "double_field": 2.718281828, "string_field": "hello"}, |
| {"bool_field": false, "int_field": -42, "long_field": -1000000, "float_field": -3.14, "double_field": -2.718281828, "string_field": "world"} |
| ])", |
| }, |
| { |
| .name = "NestedStruct", |
| .iceberg_schema = std::make_shared<Schema>(std::vector<SchemaField>{ |
| SchemaField::MakeRequired(1, "id", int32()), |
| SchemaField::MakeRequired( |
| 2, "person", |
| std::make_shared<StructType>(std::vector<SchemaField>{ |
| SchemaField::MakeRequired(3, "name", string()), |
| SchemaField::MakeRequired(4, "age", int32()), |
| })), |
| }), |
| .arrow_json = R"([ |
| {"id": 1, "person": {"name": "Alice", "age": 30}}, |
| {"id": 2, "person": {"name": "Bob", "age": 25}} |
| ])", |
| }, |
| { |
| .name = "ListOfIntegers", |
| .iceberg_schema = std::make_shared<Schema>(std::vector<SchemaField>{ |
| SchemaField::MakeRequired( |
| 1, "numbers", |
| std::make_shared<ListType>( |
| SchemaField::MakeRequired(2, "element", int32()))), |
| }), |
| .arrow_json = R"([ |
| {"numbers": [1, 2, 3]}, |
| {"numbers": [10, 20]}, |
| {"numbers": []} |
| ])", |
| }, |
| { |
| .name = "MapStringToInt", |
| .iceberg_schema = std::make_shared<Schema>(std::vector<SchemaField>{ |
| SchemaField::MakeRequired( |
| 1, "scores", |
| std::make_shared<MapType>( |
| SchemaField::MakeRequired(2, "key", string()), |
| SchemaField::MakeRequired(3, "value", int32()))), |
| }), |
| .arrow_json = R"([ |
| {"scores": [["alice", 95], ["bob", 87]]}, |
| {"scores": [["charlie", 92]]}, |
| {"scores": []} |
| ])", |
| }, |
| { |
| .name = "ComplexNested", |
| .iceberg_schema = std::make_shared<Schema>(std::vector<SchemaField>{ |
| SchemaField::MakeRequired( |
| 1, "data", |
| std::make_shared<StructType>(std::vector<SchemaField>{ |
| SchemaField::MakeRequired(2, "id", int32()), |
| SchemaField::MakeRequired( |
| 3, "tags", |
| std::make_shared<ListType>( |
| SchemaField::MakeRequired(4, "element", string()))), |
| SchemaField::MakeOptional( |
| 5, "metadata", |
| std::make_shared<MapType>( |
| SchemaField::MakeRequired(6, "key", string()), |
| SchemaField::MakeRequired(7, "value", string()))), |
| })), |
| }), |
| .arrow_json = R"([ |
| {"data": {"id": 1, "tags": ["tag1", "tag2"], "metadata": [["key1", "value1"]]}}, |
| {"data": {"id": 2, "tags": [], "metadata": null}} |
| ])", |
| }, |
| { |
| .name = "NullablePrimitives", |
| .iceberg_schema = std::make_shared<Schema>(std::vector<SchemaField>{ |
| SchemaField::MakeOptional(1, "optional_bool", boolean()), |
| SchemaField::MakeOptional(2, "optional_int", int32()), |
| SchemaField::MakeOptional(3, "optional_long", int64()), |
| SchemaField::MakeOptional(4, "optional_string", string()), |
| SchemaField::MakeRequired(5, "required_id", int32()), |
| }), |
| .arrow_json = R"([ |
| {"optional_bool": true, "optional_int": 42, "optional_long": 1000000, "optional_string": "hello", "required_id": 1}, |
| {"optional_bool": null, "optional_int": null, "optional_long": null, "optional_string": null, "required_id": 2}, |
| {"optional_bool": false, "optional_int": null, "optional_long": 2000000, "optional_string": null, "required_id": 3}, |
| {"optional_bool": null, "optional_int": 123, "optional_long": null, "optional_string": "world", "required_id": 4} |
| ])", |
| }, |
| { |
| .name = "NullableNestedStruct", |
| .iceberg_schema = std::make_shared<Schema>(std::vector<SchemaField>{ |
| SchemaField::MakeRequired(1, "id", int32()), |
| SchemaField::MakeOptional( |
| 2, "person", |
| std::make_shared<StructType>(std::vector<SchemaField>{ |
| SchemaField::MakeRequired(3, "name", string()), |
| SchemaField::MakeOptional(4, "age", int32()), |
| SchemaField::MakeOptional(5, "email", string()), |
| })), |
| SchemaField::MakeOptional(6, "department", string()), |
| }), |
| .arrow_json = R"([ |
| {"id": 1, "person": {"name": "Alice", "age": 30, "email": "alice@example.com"}, "department": "Engineering"}, |
| {"id": 2, "person": null, "department": null}, |
| {"id": 3, "person": {"name": "Bob", "age": null, "email": null}, "department": "Sales"}, |
| {"id": 4, "person": {"name": "Charlie", "age": 25, "email": null}, "department": null} |
| ])", |
| }, |
| { |
| .name = "NullableListElements", |
| .iceberg_schema = std::make_shared<Schema>(std::vector<SchemaField>{ |
| SchemaField::MakeRequired(1, "id", int32()), |
| SchemaField::MakeOptional( |
| 2, "numbers", |
| std::make_shared<ListType>( |
| SchemaField::MakeOptional(3, "element", int32()))), |
| SchemaField::MakeRequired( |
| 4, "tags", |
| std::make_shared<ListType>( |
| SchemaField::MakeOptional(5, "element", string()))), |
| }), |
| .arrow_json = R"([ |
| {"id": 1, "numbers": [1, null, 3], "tags": ["tag1", null, "tag3"]}, |
| {"id": 2, "numbers": null, "tags": ["only_tag"]}, |
| {"id": 3, "numbers": [null, null], "tags": [null, null, null]}, |
| {"id": 4, "numbers": [], "tags": []} |
| ])", |
| }, |
| { |
| .name = "NullableMapValues", |
| .iceberg_schema = std::make_shared<Schema>(std::vector<SchemaField>{ |
| SchemaField::MakeRequired(1, "id", int32()), |
| SchemaField::MakeOptional( |
| 2, "scores", |
| std::make_shared<MapType>( |
| SchemaField::MakeRequired(3, "key", string()), |
| SchemaField::MakeOptional(4, "value", int32()))), |
| SchemaField::MakeRequired( |
| 5, "metadata", |
| std::make_shared<MapType>( |
| SchemaField::MakeRequired(6, "key", string()), |
| SchemaField::MakeOptional(7, "value", string()))), |
| }), |
| .arrow_json = R"([ |
| {"id": 1, "scores": [["alice", 95], ["bob", null]], "metadata": [["key1", "value1"], ["key2", null]]}, |
| {"id": 2, "scores": null, "metadata": [["key3", null]]}, |
| {"id": 3, "scores": [["charlie", null], ["diana", 98]], "metadata": []}, |
| {"id": 4, "scores": [], "metadata": [["key4", null], ["key5", "value5"]]} |
| ])", |
| }, |
| { |
| .name = "DeeplyNestedWithNulls", |
| .iceberg_schema = std::make_shared<Schema>(std::vector<SchemaField>{ |
| SchemaField::MakeRequired( |
| 1, "root", |
| std::make_shared<StructType>(std::vector<SchemaField>{ |
| SchemaField::MakeRequired(2, "id", int32()), |
| SchemaField::MakeOptional( |
| 3, "nested", |
| std::make_shared<StructType>(std::vector<SchemaField>{ |
| SchemaField::MakeOptional(4, "name", string()), |
| SchemaField::MakeOptional( |
| 5, "values", |
| std::make_shared<ListType>( |
| SchemaField::MakeOptional(6, "element", int32()))), |
| })), |
| SchemaField::MakeOptional( |
| 7, "tags", |
| std::make_shared<ListType>( |
| SchemaField::MakeOptional(8, "element", string()))), |
| })), |
| }), |
| .arrow_json = R"([ |
| {"root": {"id": 1, "nested": {"name": "test", "values": [1, null, 3]}, "tags": ["a", "b"]}}, |
| {"root": {"id": 2, "nested": null, "tags": null}}, |
| {"root": {"id": 3, "nested": {"name": null, "values": null}, "tags": [null, "c"]}}, |
| {"root": {"id": 4, "nested": {"name": "empty", "values": []}, "tags": []}} |
| ])", |
| }, |
| { |
| .name = "AllNullsVariations", |
| .iceberg_schema = std::make_shared<Schema>(std::vector<SchemaField>{ |
| SchemaField::MakeOptional(1, "always_null", string()), |
| SchemaField::MakeOptional(2, "sometimes_null", int32()), |
| SchemaField::MakeOptional( |
| 3, "nested_struct", |
| std::make_shared<StructType>(std::vector<SchemaField>{ |
| SchemaField::MakeOptional(4, "inner_null", string()), |
| SchemaField::MakeRequired(5, "inner_required", boolean()), |
| })), |
| SchemaField::MakeRequired(6, "id", int32()), |
| }), |
| .arrow_json = R"([ |
| {"always_null": null, "sometimes_null": 42, "nested_struct": {"inner_null": "value", "inner_required": true}, "id": 1}, |
| {"always_null": null, "sometimes_null": null, "nested_struct": null, "id": 2}, |
| {"always_null": null, "sometimes_null": 123, "nested_struct": {"inner_null": null, "inner_required": false}, "id": 3}, |
| {"always_null": null, "sometimes_null": null, "nested_struct": {"inner_null": null, "inner_required": true}, "id": 4} |
| ])", |
| }, |
| }; |
| |
| INSTANTIATE_TEST_SUITE_P(AllTypes, AvroRoundTripConversionTest, |
| ::testing::ValuesIn(kRoundTripTestCases), |
| [](const ::testing::TestParamInfo<RoundTripParam>& info) { |
| return info.param.name; |
| }); |
| |
| } // namespace iceberg::avro |