| /* |
| * Copyright 2024-present Alibaba Inc. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include <cstdint> |
| #include <cstdlib> |
| #include <filesystem> |
| #include <limits> |
| #include <map> |
| #include <memory> |
| #include <string> |
| #include <tuple> |
| #include <utility> |
| #include <variant> |
| #include <vector> |
| |
| #include "arrow/api.h" |
| #include "arrow/array/array_base.h" |
| #include "arrow/array/array_nested.h" |
| #include "arrow/array/builder_binary.h" |
| #include "arrow/array/builder_nested.h" |
| #include "arrow/array/builder_primitive.h" |
| #include "arrow/c/abi.h" |
| #include "arrow/c/bridge.h" |
| #include "arrow/c/helpers.h" |
| #include "arrow/ipc/json_simple.h" |
| #include "arrow/type.h" |
| #include "fmt/format.h" |
| #include "gtest/gtest.h" |
| #include "paimon/catalog/catalog.h" |
| #include "paimon/catalog/identifier.h" |
| #include "paimon/commit_context.h" |
| #include "paimon/common/data/binary_array.h" |
| #include "paimon/common/data/binary_array_writer.h" |
| #include "paimon/common/data/binary_row.h" |
| #include "paimon/common/data/binary_row_writer.h" |
| #include "paimon/common/data/data_define.h" |
| #include "paimon/common/factories/io_hook.h" |
| #include "paimon/common/types/data_field.h" |
| #include "paimon/common/types/row_kind.h" |
| #include "paimon/common/utils/arrow/status_utils.h" |
| #include "paimon/common/utils/date_time_utils.h" |
| #include "paimon/common/utils/decimal_utils.h" |
| #include "paimon/common/utils/path_util.h" |
| #include "paimon/common/utils/scope_guard.h" |
| #include "paimon/core/io/compact_increment.h" |
| #include "paimon/core/io/data_file_meta.h" |
| #include "paimon/core/io/data_increment.h" |
| #include "paimon/core/manifest/file_source.h" |
| #include "paimon/core/schema/schema_manager.h" |
| #include "paimon/core/snapshot.h" |
| #include "paimon/core/stats/simple_stats.h" |
| #include "paimon/core/table/sink/commit_message_impl.h" |
| #include "paimon/core/table/source/data_split_impl.h" |
| #include "paimon/core/utils/file_utils.h" |
| #include "paimon/core/utils/snapshot_manager.h" |
| #include "paimon/data/blob.h" |
| #include "paimon/data/decimal.h" |
| #include "paimon/data/timestamp.h" |
| #include "paimon/defs.h" |
| #include "paimon/file_store_commit.h" |
| #include "paimon/file_store_write.h" |
| #include "paimon/format/file_format.h" |
| #include "paimon/format/file_format_factory.h" |
| #include "paimon/format/reader_builder.h" |
| #include "paimon/fs/file_system.h" |
| #include "paimon/fs/local/local_file_system.h" |
| #include "paimon/memory/memory_pool.h" |
| #include "paimon/read_context.h" |
| #include "paimon/reader/batch_reader.h" |
| #include "paimon/reader/file_batch_reader.h" |
| #include "paimon/record_batch.h" |
| #include "paimon/result.h" |
| #include "paimon/scan_context.h" |
| #include "paimon/status.h" |
| #include "paimon/table/source/data_split.h" |
| #include "paimon/table/source/plan.h" |
| #include "paimon/table/source/startup_mode.h" |
| #include "paimon/table/source/table_read.h" |
| #include "paimon/table/source/table_scan.h" |
| #include "paimon/testing/utils/binary_row_generator.h" |
| #include "paimon/testing/utils/data_generator.h" |
| #include "paimon/testing/utils/io_exception_helper.h" |
| #include "paimon/testing/utils/read_result_collector.h" |
| #include "paimon/testing/utils/test_helper.h" |
| #include "paimon/testing/utils/testharness.h" |
| #include "paimon/testing/utils/timezone_guard.h" |
| #include "paimon/utils/roaring_bitmap32.h" |
| #include "paimon/write_context.h" |
| namespace paimon { |
| class CommitMessage; |
| class TableSchema; |
| } // namespace paimon |
| |
| namespace paimon::test { |
| class WriteInteTest : public testing::Test, public ::testing::WithParamInterface<std::string> { |
| public: |
| void SetUp() override { |
| file_system_ = std::make_shared<LocalFileSystem>(); |
| pool_ = GetDefaultPool(); |
| int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs(); |
| std::srand(seed); |
| } |
| |
| Result<std::string> CreateTestTable(const std::string& base_path, const std::string& db_name, |
| const std::string& table_name, ::ArrowSchema* schema, |
| const std::vector<std::string>& partition_keys, |
| const std::vector<std::string>& primary_keys, |
| const std::map<std::string, std::string>& options) const { |
| PAIMON_ASSIGN_OR_RAISE(auto catalog, Catalog::Create(base_path, options)); |
| PAIMON_RETURN_NOT_OK(catalog->CreateDatabase(db_name, options, /*ignore_if_exists=*/false)); |
| Identifier table_id(db_name, table_name); |
| PAIMON_RETURN_NOT_OK(catalog->CreateTable(table_id, schema, partition_keys, primary_keys, |
| options, |
| /*ignore_if_exists=*/false)); |
| return PathUtil::JoinPath(base_path, db_name + ".db/" + table_name); |
| } |
| |
| using ValueType = std::tuple<std::string, int32_t, int32_t, double>; |
| Result<std::unique_ptr<RecordBatch>> MakeRecordBatch( |
| const std::vector<ValueType>& raw_data, |
| const std::map<std::string, std::string>& partition_map, int32_t bucket) const { |
| ::ArrowArray arrow_array; |
| std::shared_ptr<arrow::Array> array = GenerateArrowArray(raw_data); |
| PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportArray(*array, &arrow_array)); |
| RecordBatchBuilder batch_builder(&arrow_array); |
| return batch_builder.SetPartition(partition_map).SetBucket(bucket).Finish(); |
| } |
| |
| BinaryRow MakeBinaryRow(const RowKind* kind, const std::string& f0, const std::string& f1, |
| int32_t f2, double f3) const { |
| return BinaryRowGenerator::GenerateRow(kind, {f0, f1, f2, f3}, pool_.get()); |
| } |
| |
| void CheckFileCount(const std::string& root_path, const std::vector<std::string>& subdirs, |
| int32_t expect_file_count) const { |
| std::vector<std::unique_ptr<BasicFileStatus>> status_list; |
| for (const auto& dir : subdirs) { |
| ASSERT_OK(file_system_->ListDir(PathUtil::JoinPath(root_path, dir), &status_list)); |
| } |
| int32_t file_count = 0; |
| for (const auto& file_status : status_list) { |
| if (!file_status->IsDir()) { |
| file_count++; |
| } |
| } |
| ASSERT_EQ(file_count, expect_file_count); |
| } |
| |
| std::shared_ptr<arrow::Array> GenerateArrowArray(const std::vector<ValueType>& raw_data, |
| bool exist_null_value = false) const { |
| auto string_field = arrow::field("f0", arrow::utf8()); |
| auto int_field = arrow::field("f1", arrow::int32()); |
| auto int_field1 = arrow::field("f2", arrow::int32()); |
| auto double_field = arrow::field("f3", arrow::float64()); |
| auto struct_type = arrow::struct_({string_field, int_field, int_field1, double_field}); |
| auto schema = |
| arrow::schema(arrow::FieldVector({string_field, int_field, int_field1, double_field})); |
| |
| arrow::StructBuilder struct_builder( |
| struct_type, arrow::default_memory_pool(), |
| {std::make_shared<arrow::StringBuilder>(), std::make_shared<arrow::Int32Builder>(), |
| std::make_shared<arrow::Int32Builder>(), std::make_shared<arrow::DoubleBuilder>()}); |
| auto string_builder = static_cast<arrow::StringBuilder*>(struct_builder.field_builder(0)); |
| auto int_builder = static_cast<arrow::Int32Builder*>(struct_builder.field_builder(1)); |
| auto int_builder1 = static_cast<arrow::Int32Builder*>(struct_builder.field_builder(2)); |
| auto double_builder = static_cast<arrow::DoubleBuilder*>(struct_builder.field_builder(3)); |
| |
| for (const auto& d : raw_data) { |
| EXPECT_TRUE(struct_builder.Append().ok()); |
| EXPECT_TRUE(string_builder->Append(std::get<0>(d)).ok()); |
| EXPECT_TRUE(int_builder->Append(std::get<1>(d)).ok()); |
| EXPECT_TRUE(int_builder1->Append(std::get<2>(d)).ok()); |
| if (exist_null_value) { |
| EXPECT_TRUE(double_builder->AppendNull().ok()); |
| } else { |
| EXPECT_TRUE(double_builder->Append(std::get<3>(d)).ok()); |
| } |
| } |
| std::shared_ptr<arrow::Array> array; |
| EXPECT_TRUE(struct_builder.Finish(&array).ok()); |
| return array; |
| } |
| |
| std::shared_ptr<DataFileMeta> ReconstructDataFileMeta( |
| const std::shared_ptr<DataFileMeta>& file_meta) const { |
| if (GetParam() != "lance" && GetParam() != "avro") { |
| return file_meta; |
| } |
| // for lance and avro format, all stats is null |
| auto new_meta = std::make_shared<DataFileMeta>( |
| file_meta->file_name, file_meta->file_size, file_meta->row_count, file_meta->min_key, |
| file_meta->max_key, file_meta->key_stats, file_meta->value_stats, |
| file_meta->min_sequence_number, file_meta->max_sequence_number, file_meta->schema_id, |
| file_meta->level, file_meta->extra_files, file_meta->creation_time, |
| file_meta->delete_row_count, file_meta->embedded_index, file_meta->file_source, |
| file_meta->value_stats_cols, file_meta->external_path, file_meta->first_row_id, |
| file_meta->write_cols); |
| auto generate_null_stats = [this](const SimpleStats& stats) -> SimpleStats { |
| if (stats == SimpleStats::EmptyStats()) { |
| return stats; |
| } |
| BinaryRowGenerator::ValueType min_values(stats.MinValues().GetFieldCount(), NullType()); |
| BinaryRowGenerator::ValueType max_values(stats.MaxValues().GetFieldCount(), NullType()); |
| BinaryArray null_counts; |
| BinaryArrayWriter array_writer(&null_counts, stats.NullCounts().Size(), |
| /*element_size=*/sizeof(int64_t), pool_.get()); |
| for (int32_t i = 0; i < stats.NullCounts().Size(); ++i) { |
| array_writer.SetNullAt(i); |
| } |
| array_writer.Complete(); |
| |
| return SimpleStats(BinaryRowGenerator::GenerateRow(min_values, pool_.get()), |
| BinaryRowGenerator::GenerateRow(max_values, pool_.get()), |
| null_counts); |
| }; |
| new_meta->key_stats = generate_null_stats(new_meta->key_stats); |
| new_meta->value_stats = generate_null_stats(new_meta->value_stats); |
| return new_meta; |
| } |
| |
| void CheckCreationTime(const std::vector<std::shared_ptr<CommitMessage>>& commit_messages) { |
| TimezoneGuard guard("Asia/Shanghai"); |
| for (const auto& msg : commit_messages) { |
| auto msg_impl = dynamic_cast<CommitMessageImpl*>(msg.get()); |
| ASSERT_TRUE(msg_impl); |
| const auto& data_increment = msg_impl->GetNewFilesIncrement(); |
| for (const auto& meta : data_increment.NewFiles()) { |
| const auto& creation_time = meta->creation_time; |
| ASSERT_OK_AND_ASSIGN(int64_t utc_creation_time, meta->CreationTimeEpochMillis()); |
| ASSERT_EQ(creation_time.GetMillisecond() - utc_creation_time, 28800000l); |
| // creation time > 2023-09-28 and < 2223-09-28 |
| ASSERT_GT(creation_time.GetMillisecond(), 1695866400000l); |
| ASSERT_LT(creation_time.GetMillisecond(), 8007213600000l); |
| } |
| } |
| } |
| |
| private: |
| std::shared_ptr<FileSystem> file_system_; |
| std::shared_ptr<MemoryPool> pool_; |
| }; |
| |
| std::vector<std::string> GetTestValuesForWriteInteTest() { |
| std::vector<std::string> values; |
| values.emplace_back("parquet"); |
| #ifdef PAIMON_ENABLE_ORC |
| values.emplace_back("orc"); |
| #endif |
| #ifdef PAIMON_ENABLE_LANCE |
| values.emplace_back("lance"); |
| #endif |
| #ifdef PAIMON_ENABLE_AVRO |
| values.emplace_back("avro"); |
| #endif |
| return values; |
| } |
| |
| INSTANTIATE_TEST_SUITE_P(FileFormat, WriteInteTest, |
| ::testing::ValuesIn(GetTestValuesForWriteInteTest())); |
| |
| TEST_P(WriteInteTest, TestAppendTableBatchWrite) { |
| auto dir = UniqueTestDirectory::Create(); |
| arrow::FieldVector fields = { |
| arrow::field("f0", arrow::boolean()), arrow::field("f1", arrow::int8()), |
| arrow::field("f2", arrow::int16()), arrow::field("f3", arrow::int32()), |
| arrow::field("field_null", arrow::int32()), arrow::field("f4", arrow::int64()), |
| arrow::field("f5", arrow::float32()), arrow::field("f6", arrow::float64()), |
| arrow::field("f7", arrow::utf8()), arrow::field("f8", arrow::binary())}; |
| auto schema = arrow::schema(fields); |
| |
| auto file_format = GetParam(); |
| std::map<std::string, std::string> options = {{Options::MANIFEST_FORMAT, "orc"}, |
| {Options::FILE_FORMAT, file_format}, |
| {Options::TARGET_FILE_SIZE, "1024"}, |
| {Options::BUCKET, "-1"}, |
| {Options::FILE_SYSTEM, "local"}}; |
| |
| ASSERT_OK_AND_ASSIGN( |
| auto helper, TestHelper::Create(dir->Str(), schema, /*partition_keys=*/{}, |
| /*primary_keys=*/{}, options, /*is_streaming_mode=*/false)); |
| int64_t commit_identifier = 0; |
| |
| std::string data_1 = |
| R"([[true, 0, 32767, 2147483647, null, 4294967295, 0.5, 1.141592659, "20250327", "banana"], |
| [false, 1, 32767, null, null, 4294967296, 1.0, 2.141592658, "20250327", "dog"], |
| [null, 1, 32767, 2147483647, null, null, 2.0, 3.141592657, null, "lucy"], |
| [true, -2, -32768, -2147483648, null, -4294967298, 2.0, 3.141592657, "20250326", "mouse"]])"; |
| |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatch> batch_1, |
| TestHelper::MakeRecordBatch(arrow::struct_(fields), data_1, |
| /*partition_map=*/{}, /*bucket=*/0, {})); |
| |
| auto file_meta = std::make_shared<DataFileMeta>( |
| "data-xxx.xxx", /*file_size=*/543, /*row_count=*/4, |
| /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), |
| /*key_stats=*/SimpleStats::EmptyStats(), |
| BinaryRowGenerator::GenerateStats( |
| {false, static_cast<int8_t>(-2), static_cast<int16_t>(-32768), |
| static_cast<int32_t>(-2147483648), NullType(), static_cast<int64_t>(-4294967298), |
| static_cast<float>(0.5), 1.141592659, std::string("20250326"), NullType()}, |
| {true, static_cast<int8_t>(1), static_cast<int16_t>(32767), |
| static_cast<int32_t>(2147483647), NullType(), static_cast<int64_t>(4294967296), |
| static_cast<float>(2.0), 3.141592657, std::string("20250327"), NullType()}, |
| std::vector<int64_t>({1, 0, 0, 1, 4, 1, 0, 0, 1, 0}), pool_.get()), |
| /*min_sequence_number=*/0, /*max_sequence_number=*/3, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1724090888706ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, |
| /*first_row_id=*/std::nullopt, |
| /*write_cols=*/std::nullopt); |
| file_meta = ReconstructDataFileMeta(file_meta); |
| DataIncrement data_increment({file_meta}, {}, {}); |
| std::shared_ptr<CommitMessage> expected_commit_message_1 = std::make_shared<CommitMessageImpl>( |
| BinaryRow::EmptyRow(), /*bucket=*/0, |
| /*total_bucket=*/-1, data_increment, CompactIncrement({}, {}, {})); |
| std::vector<std::shared_ptr<CommitMessage>> expected_commit_messages_1 = { |
| expected_commit_message_1}; |
| ASSERT_OK_AND_ASSIGN(auto commit_msgs, |
| helper->WriteAndCommit(std::move(batch_1), commit_identifier++, |
| expected_commit_messages_1)); |
| CheckCreationTime(commit_msgs); |
| ASSERT_OK_AND_ASSIGN(std::optional<Snapshot> snapshot1, helper->LatestSnapshot()); |
| ASSERT_TRUE(snapshot1); |
| ASSERT_EQ(1, snapshot1.value().Id()); |
| ASSERT_EQ(4, snapshot1.value().TotalRecordCount().value()); |
| ASSERT_EQ(4, snapshot1.value().DeltaRecordCount().value()); |
| |
| arrow::FieldVector fields_with_row_kind = fields; |
| fields_with_row_kind.insert(fields_with_row_kind.begin(), |
| arrow::field("_VALUE_KIND", arrow::int8())); |
| auto data_type = arrow::struct_(fields_with_row_kind); |
| |
| ASSERT_OK_AND_ASSIGN(std::vector<std::shared_ptr<Split>> data_splits_1, |
| helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt)); |
| ASSERT_EQ(data_splits_1.size(), 1); |
| std::string expected_data_1 = |
| R"([[0, true, 0, 32767, 2147483647, null, 4294967295, 0.5, 1.141592659, "20250327", "banana"], |
| [0, false, 1, 32767, null, null, 4294967296, 1.0, 2.141592658, "20250327", "dog"], |
| [0, null, 1, 32767, 2147483647, null, null, 2.0, 3.141592657, null, "lucy"], |
| [0, true, -2, -32768, -2147483648, null, -4294967298, 2.0, 3.141592657, "20250326", "mouse"]])"; |
| ASSERT_OK_AND_ASSIGN(bool success, |
| helper->ReadAndCheckResult(data_type, data_splits_1, expected_data_1)); |
| ASSERT_TRUE(success); |
| std::string data_2 = |
| fmt::format(R"([ |
| [true, {},{},{},null,{},{},{},"∞",""], |
| [false,{},{},{},null,{},{},{}, "",""], |
| [true, 42,-1,999999999,null,123456789012345,0.1,-Inf,"a\u20ACb","binary\ndata"]])", |
| std::numeric_limits<int8_t>::min(), std::numeric_limits<int16_t>::min(), |
| std::numeric_limits<int32_t>::min(), std::numeric_limits<int64_t>::min(), |
| std::numeric_limits<float>::min(), std::numeric_limits<double>::min(), |
| std::numeric_limits<int8_t>::max(), std::numeric_limits<int16_t>::max(), |
| std::numeric_limits<int32_t>::max(), std::numeric_limits<int64_t>::max(), |
| std::numeric_limits<float>::max(), std::numeric_limits<double>::max()); |
| |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatch> batch_2, |
| TestHelper::MakeRecordBatch(arrow::struct_(fields), data_2, |
| /*partition_map=*/{}, /*bucket=*/0, {})); |
| ASSERT_NOK_WITH_MSG( |
| helper->WriteAndCommit(std::move(batch_2), commit_identifier++, std::nullopt), |
| "batch write mode only support one-time committing."); |
| } |
| |
| TEST_P(WriteInteTest, TestAppendTableStreamWriteWithOneBucket) { |
| auto dir = UniqueTestDirectory::Create(); |
| arrow::FieldVector fields = { |
| arrow::field("f0", arrow::boolean()), arrow::field("f1", arrow::int8()), |
| arrow::field("f2", arrow::int16()), arrow::field("f3", arrow::int32()), |
| arrow::field("field_null", arrow::int32()), arrow::field("f4", arrow::int64()), |
| arrow::field("f5", arrow::float32()), arrow::field("f6", arrow::float64()), |
| arrow::field("f7", arrow::utf8()), arrow::field("f8", arrow::binary())}; |
| auto schema = arrow::schema(fields); |
| |
| auto file_format = GetParam(); |
| std::map<std::string, std::string> options = { |
| {Options::MANIFEST_FORMAT, "orc"}, {Options::FILE_FORMAT, file_format}, |
| {Options::TARGET_FILE_SIZE, "1024"}, {Options::BUCKET, "1"}, |
| {Options::BUCKET_KEY, "f5"}, {Options::FILE_SYSTEM, "local"}, |
| }; |
| |
| ASSERT_OK_AND_ASSIGN( |
| auto helper, TestHelper::Create(dir->Str(), schema, /*partition_keys=*/{}, |
| /*primary_keys=*/{}, options, /*is_streaming_mode=*/true)); |
| int64_t commit_identifier = 0; |
| |
| std::string data_1 = |
| R"([[true, 0, 32767, 2147483647, null, 4294967295, 0.5, 1.141592659, "20250327", "banana"], |
| [false, 1, 32767, null, null, 4294967296, 1.0, 2.141592658, "20250327", "dog"], |
| [null, 1, 32767, 2147483647, null, null, 2.0, 3.141592657, null, "lucy"], |
| [true, -2, -32768, -2147483648, null, -4294967298, 2.0, 3.141592657, "20250326", "mouse"]])"; |
| |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatch> batch_1, |
| TestHelper::MakeRecordBatch(arrow::struct_(fields), data_1, |
| /*partition_map=*/{}, /*bucket=*/0, {})); |
| |
| auto file_meta = std::make_shared<DataFileMeta>( |
| "data-xxx.xxx", /*file_size=*/543, /*row_count=*/4, |
| /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), |
| /*key_stats=*/SimpleStats::EmptyStats(), |
| BinaryRowGenerator::GenerateStats( |
| {false, static_cast<int8_t>(-2), static_cast<int16_t>(-32768), |
| static_cast<int32_t>(-2147483648), NullType(), static_cast<int64_t>(-4294967298), |
| static_cast<float>(0.5), 1.141592659, std::string("20250326"), NullType()}, |
| {true, static_cast<int8_t>(1), static_cast<int16_t>(32767), |
| static_cast<int32_t>(2147483647), NullType(), static_cast<int64_t>(4294967296), |
| static_cast<float>(2.0), 3.141592657, std::string("20250327"), NullType()}, |
| std::vector<int64_t>({1, 0, 0, 1, 4, 1, 0, 0, 1, 0}), pool_.get()), |
| /*min_sequence_number=*/0, /*max_sequence_number=*/3, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1724090888706ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, |
| /*first_row_id=*/std::nullopt, |
| /*write_cols=*/std::nullopt); |
| file_meta = ReconstructDataFileMeta(file_meta); |
| DataIncrement data_increment({file_meta}, {}, {}); |
| std::shared_ptr<CommitMessage> expected_commit_message_1 = std::make_shared<CommitMessageImpl>( |
| BinaryRow::EmptyRow(), /*bucket=*/0, |
| /*total_bucket=*/1, data_increment, CompactIncrement({}, {}, {})); |
| std::vector<std::shared_ptr<CommitMessage>> expected_commit_messages_1 = { |
| expected_commit_message_1}; |
| ASSERT_OK_AND_ASSIGN(auto commit_msgs, |
| helper->WriteAndCommit(std::move(batch_1), commit_identifier++, |
| expected_commit_messages_1)); |
| ASSERT_OK_AND_ASSIGN(std::optional<Snapshot> snapshot1, helper->LatestSnapshot()); |
| ASSERT_TRUE(snapshot1); |
| ASSERT_EQ(1, snapshot1.value().Id()); |
| ASSERT_EQ(4, snapshot1.value().TotalRecordCount().value()); |
| ASSERT_EQ(4, snapshot1.value().DeltaRecordCount().value()); |
| |
| arrow::FieldVector fields_with_row_kind = fields; |
| fields_with_row_kind.insert(fields_with_row_kind.begin(), |
| arrow::field("_VALUE_KIND", arrow::int8())); |
| auto data_type = arrow::struct_(fields_with_row_kind); |
| std::string expected_data_1 = |
| R"([[0, true, 0, 32767, 2147483647, null, 4294967295, 0.5, 1.141592659, "20250327", "banana"], |
| [0, false, 1, 32767, null, null, 4294967296, 1.0, 2.141592658, "20250327", "dog"], |
| [0, null, 1, 32767, 2147483647, null, null, 2.0, 3.141592657, null, "lucy"], |
| [0, true, -2, -32768, -2147483648, null, -4294967298, 2.0, 3.141592657, "20250326", "mouse"]])"; |
| |
| ASSERT_OK_AND_ASSIGN(std::vector<std::shared_ptr<Split>> data_splits_1, |
| helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt)); |
| ASSERT_EQ(data_splits_1.size(), 1); |
| ASSERT_OK_AND_ASSIGN(bool success, |
| helper->ReadAndCheckResult(data_type, data_splits_1, expected_data_1)); |
| ASSERT_TRUE(success); |
| |
| std::string data_2 = |
| fmt::format(R"([ |
| [true, {},{},{},null,{},{},{},"∞",""], |
| [false,{},{},{},null,{},{},{}, "",""], |
| [true, 42,-1,999999999,null,123456789012345,0.1,-Inf,"a\u20ACb","binary\ndata"]])", |
| std::numeric_limits<int8_t>::min(), std::numeric_limits<int16_t>::min(), |
| std::numeric_limits<int32_t>::min(), std::numeric_limits<int64_t>::min(), |
| std::numeric_limits<float>::min(), std::numeric_limits<double>::min(), |
| std::numeric_limits<int8_t>::max(), std::numeric_limits<int16_t>::max(), |
| std::numeric_limits<int32_t>::max(), std::numeric_limits<int64_t>::max(), |
| std::numeric_limits<float>::max(), std::numeric_limits<double>::max()); |
| |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatch> batch_2, |
| TestHelper::MakeRecordBatch(arrow::struct_(fields), data_2, |
| /*partition_map=*/{}, /*bucket=*/0, {})); |
| |
| auto file_meta_2 = std::make_shared<DataFileMeta>( |
| "data-xxx.xxx", /*file_size=*/543, /*row_count=*/3, |
| /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), |
| /*key_stats=*/SimpleStats::EmptyStats(), |
| BinaryRowGenerator::GenerateStats( |
| {false, std::numeric_limits<int8_t>::min(), std::numeric_limits<int16_t>::min(), |
| std::numeric_limits<int32_t>::min(), NullType(), std::numeric_limits<int64_t>::min(), |
| std::numeric_limits<float>::min(), -std::numeric_limits<double>::infinity(), |
| std::string(""), NullType()}, |
| {true, std::numeric_limits<int8_t>::max(), std::numeric_limits<int16_t>::max(), |
| std::numeric_limits<int32_t>::max(), NullType(), std::numeric_limits<int64_t>::max(), |
| std::numeric_limits<float>::max(), std::numeric_limits<double>::max(), |
| std::string("∞"), NullType()}, |
| std::vector<int64_t>({0, 0, 0, 0, 3, 0, 0, 0, 0, 0}), pool_.get()), |
| /*min_sequence_number=*/4, /*max_sequence_number=*/6, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1724090888706ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, |
| /*first_row_id=*/std::nullopt, |
| /*write_cols=*/std::nullopt); |
| file_meta_2 = ReconstructDataFileMeta(file_meta_2); |
| DataIncrement data_increment_2({file_meta_2}, {}, {}); |
| std::shared_ptr<CommitMessage> expected_commit_message_2 = std::make_shared<CommitMessageImpl>( |
| BinaryRow::EmptyRow(), /*bucket=*/0, |
| /*total_bucket=*/1, data_increment_2, CompactIncrement({}, {}, {})); |
| std::vector<std::shared_ptr<CommitMessage>> expected_commit_messages_2 = { |
| expected_commit_message_2}; |
| ASSERT_OK_AND_ASSIGN(auto commit_msgs_2, |
| helper->WriteAndCommit(std::move(batch_2), commit_identifier++, |
| expected_commit_messages_2)); |
| ASSERT_OK_AND_ASSIGN(std::optional<Snapshot> snapshot2, helper->LatestSnapshot()); |
| ASSERT_TRUE(snapshot2); |
| ASSERT_EQ(2, snapshot2.value().Id()); |
| ASSERT_EQ(7, snapshot2.value().TotalRecordCount().value()); |
| ASSERT_EQ(3, snapshot2.value().DeltaRecordCount().value()); |
| ASSERT_OK_AND_ASSIGN(std::vector<std::shared_ptr<Split>> data_splits_2, helper->Scan()); |
| ASSERT_EQ(data_splits_2.size(), 1); |
| |
| std::string expected_data_2 = |
| fmt::format(R"([ |
| [0, true, {},{},{},null,{},{},{},"∞",""], |
| [0, false,{},{},{},null,{},{},{}, "",""], |
| [0, true, 42,-1,999999999,null,123456789012345,0.1,-Inf,"a\u20ACb","binary\ndata"]])", |
| std::numeric_limits<int8_t>::min(), std::numeric_limits<int16_t>::min(), |
| std::numeric_limits<int32_t>::min(), std::numeric_limits<int64_t>::min(), |
| std::numeric_limits<float>::min(), std::numeric_limits<double>::min(), |
| std::numeric_limits<int8_t>::max(), std::numeric_limits<int16_t>::max(), |
| std::numeric_limits<int32_t>::max(), std::numeric_limits<int64_t>::max(), |
| std::numeric_limits<float>::max(), std::numeric_limits<double>::max()); |
| ASSERT_OK_AND_ASSIGN(success, |
| helper->ReadAndCheckResult(data_type, data_splits_2, expected_data_2)); |
| ASSERT_TRUE(success); |
| } |
| |
| TEST_P(WriteInteTest, TestAppendTableStreamWriteWithPartitionAndMultiBuckets) { |
| auto dir = UniqueTestDirectory::Create(); |
| ASSERT_TRUE(dir); |
| arrow::FieldVector fields = { |
| arrow::field("f0", arrow::int32()), arrow::field("f1", arrow::int32()), |
| arrow::field("f2", arrow::utf8()), arrow::field("f3", arrow::float64())}; |
| auto schema = arrow::schema(fields); |
| std::vector<std::string> primary_keys = {}; |
| std::vector<std::string> partition_keys = {"f2", "f1"}; |
| auto file_format = GetParam(); |
| std::map<std::string, std::string> options = { |
| {Options::MANIFEST_FORMAT, "orc"}, {Options::FILE_FORMAT, file_format}, |
| {Options::TARGET_FILE_SIZE, "1024"}, {Options::BUCKET, "2"}, |
| {Options::BUCKET_KEY, "f0"}, {Options::FILE_SYSTEM, "local"}, |
| }; |
| ASSERT_OK_AND_ASSIGN( |
| auto helper, TestHelper::Create(dir->Str(), schema, partition_keys, primary_keys, options, |
| /*is_streaming_mode=*/true)); |
| int64_t commit_identifier = 0; |
| ASSERT_OK_AND_ASSIGN(std::optional<std::shared_ptr<TableSchema>> table_schema, |
| helper->LatestSchema()); |
| ASSERT_TRUE(table_schema); |
| |
| DataGenerator gen(table_schema.value(), pool_); |
| std::vector<BinaryRow> datas_1; |
| datas_1.push_back( |
| BinaryRowGenerator::GenerateRow({0, 19, std::string("20250326"), 9.1}, pool_.get())); |
| datas_1.push_back( |
| BinaryRowGenerator::GenerateRow({0, 19, std::string("20250326"), 10.1}, pool_.get())); |
| datas_1.push_back( |
| BinaryRowGenerator::GenerateRow({1, 19, std::string("20250326"), 11.1}, pool_.get())); |
| datas_1.push_back( |
| BinaryRowGenerator::GenerateRow({1, 19, std::string("20250326"), 12.1}, pool_.get())); |
| datas_1.push_back( |
| BinaryRowGenerator::GenerateRow({1, 19, std::string("20250326"), 13.1}, pool_.get())); |
| datas_1.push_back( |
| BinaryRowGenerator::GenerateRow({0, 22, std::string("20250326"), 14.1}, pool_.get())); |
| datas_1.push_back( |
| BinaryRowGenerator::GenerateRow({0, 22, std::string("20250326"), 15.1}, pool_.get())); |
| datas_1.push_back( |
| BinaryRowGenerator::GenerateRow({0, 22, std::string("20250326"), 16.1}, pool_.get())); |
| ASSERT_OK_AND_ASSIGN(auto batches_1, gen.SplitArrayByPartitionAndBucket(datas_1)); |
| ASSERT_EQ(3, batches_1.size()); |
| ASSERT_OK_AND_ASSIGN( |
| auto commit_msgs, |
| helper->WriteAndCommit(std::move(batches_1), commit_identifier++, std::nullopt)); |
| ASSERT_OK_AND_ASSIGN(std::optional<Snapshot> snapshot1, helper->LatestSnapshot()); |
| ASSERT_TRUE(snapshot1); |
| ASSERT_EQ(1, snapshot1.value().Id()); |
| ASSERT_EQ(8, snapshot1.value().TotalRecordCount().value()); |
| ASSERT_EQ(8, snapshot1.value().DeltaRecordCount().value()); |
| |
| arrow::FieldVector fields_with_row_kind = fields; |
| fields_with_row_kind.insert(fields_with_row_kind.begin(), |
| arrow::field("_VALUE_KIND", arrow::int8())); |
| auto data_type = arrow::struct_(fields_with_row_kind); |
| ASSERT_OK_AND_ASSIGN(std::vector<std::shared_ptr<Split>> data_splits_1, |
| helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt)); |
| ASSERT_EQ(data_splits_1.size(), 3); |
| |
| std::map<std::pair<std::string, int32_t>, std::string> expected_datas_1; |
| expected_datas_1[std::make_pair("f2=20250326/f1=19/", 1)] = R"([ |
| [0, 0, 19, "20250326", 9.1], |
| [0, 0, 19, "20250326", 10.1] |
| ])"; |
| |
| expected_datas_1[std::make_pair("f2=20250326/f1=19/", 0)] = R"([ |
| [0, 1, 19, "20250326", 11.1], |
| [0, 1, 19, "20250326", 12.1], |
| [0, 1, 19, "20250326", 13.1] |
| ])"; |
| |
| expected_datas_1[std::make_pair("f2=20250326/f1=22/", 1)] = R"([ |
| [0, 0, 22, "20250326", 14.1], |
| [0, 0, 22, "20250326", 15.1], |
| [0, 0, 22, "20250326", 16.1] |
| ])"; |
| |
| for (const auto& split : data_splits_1) { |
| auto split_impl = dynamic_cast<DataSplitImpl*>(split.get()); |
| ASSERT_OK_AND_ASSIGN(std::string partition_str, |
| helper->PartitionStr(split_impl->Partition())); |
| auto iter = expected_datas_1.find(std::make_pair(partition_str, split_impl->Bucket())); |
| ASSERT_TRUE(iter != expected_datas_1.end()); |
| ASSERT_OK_AND_ASSIGN(bool success, |
| helper->ReadAndCheckResult(data_type, {split}, iter->second)); |
| ASSERT_TRUE(success); |
| } |
| |
| std::vector<BinaryRow> datas_2; |
| datas_2.push_back( |
| BinaryRowGenerator::GenerateRow({0, 20, std::string("20250326"), 19.1}, pool_.get())); |
| datas_2.push_back( |
| BinaryRowGenerator::GenerateRow({0, 20, std::string("20250326"), 20.1}, pool_.get())); |
| datas_2.push_back( |
| BinaryRowGenerator::GenerateRow({1, 20, std::string("20250326"), 21.1}, pool_.get())); |
| datas_2.push_back( |
| BinaryRowGenerator::GenerateRow({1, 20, std::string("20250326"), 22.1}, pool_.get())); |
| datas_2.push_back( |
| BinaryRowGenerator::GenerateRow({1, 20, std::string("20250326"), 23.1}, pool_.get())); |
| datas_2.push_back( |
| BinaryRowGenerator::GenerateRow({0, 23, std::string("20250326"), 24.1}, pool_.get())); |
| datas_2.push_back( |
| BinaryRowGenerator::GenerateRow({0, 23, std::string("20250326"), 25.1}, pool_.get())); |
| datas_2.push_back( |
| BinaryRowGenerator::GenerateRow({0, 23, std::string("20250326"), 26.1}, pool_.get())); |
| ASSERT_OK_AND_ASSIGN(auto batches_2, gen.SplitArrayByPartitionAndBucket(datas_2)); |
| ASSERT_EQ(3, batches_2.size()); |
| ASSERT_OK_AND_ASSIGN( |
| auto commit_msgs_2, |
| helper->WriteAndCommit(std::move(batches_2), commit_identifier++, std::nullopt)); |
| ASSERT_OK_AND_ASSIGN(std::optional<Snapshot> snapshot2, helper->LatestSnapshot()); |
| ASSERT_TRUE(snapshot2); |
| ASSERT_EQ(2, snapshot2.value().Id()); |
| ASSERT_EQ(16, snapshot2.value().TotalRecordCount().value()); |
| ASSERT_EQ(8, snapshot2.value().DeltaRecordCount().value()); |
| |
| ASSERT_OK_AND_ASSIGN(std::vector<std::shared_ptr<Split>> data_splits_2, helper->Scan()); |
| ASSERT_EQ(data_splits_2.size(), 3); |
| |
| std::map<std::pair<std::string, int32_t>, std::string> expected_datas_2; |
| expected_datas_2[std::make_pair("f2=20250326/f1=20/", 1)] = R"([ |
| [0, 0, 20, "20250326", 19.1], |
| [0, 0, 20, "20250326", 20.1] |
| ])"; |
| |
| expected_datas_2[std::make_pair("f2=20250326/f1=20/", 0)] = R"([ |
| [0, 1, 20, "20250326", 21.1], |
| [0, 1, 20, "20250326", 22.1], |
| [0, 1, 20, "20250326", 23.1] |
| ])"; |
| |
| expected_datas_2[std::make_pair("f2=20250326/f1=23/", 1)] = R"([ |
| [0, 0, 23, "20250326", 24.1], |
| [0, 0, 23, "20250326", 25.1], |
| [0, 0, 23, "20250326", 26.1] |
| ])"; |
| |
| for (const auto& split : data_splits_2) { |
| auto split_impl = dynamic_cast<DataSplitImpl*>(split.get()); |
| ASSERT_OK_AND_ASSIGN(std::string partition_str, |
| helper->PartitionStr(split_impl->Partition())); |
| auto iter = expected_datas_2.find(std::make_pair(partition_str, split_impl->Bucket())); |
| ASSERT_TRUE(iter != expected_datas_2.end()); |
| ASSERT_OK_AND_ASSIGN(bool success, |
| helper->ReadAndCheckResult(data_type, {split}, iter->second)); |
| ASSERT_TRUE(success); |
| } |
| } |
| |
| TEST_P(WriteInteTest, TestAppendTableWriteWithComplexType) { |
| if (GetParam() == "lance") { |
| // lance do not support map |
| return; |
| } |
| auto dir = UniqueTestDirectory::Create(); |
| arrow::FieldVector fields = { |
| arrow::field("f1", arrow::map(arrow::int8(), arrow::int16())), |
| arrow::field("f2", arrow::list(arrow::float32())), |
| arrow::field("f3", arrow::struct_({arrow::field("f0", arrow::boolean()), |
| arrow::field("f1", arrow::int64())})), |
| arrow::field("f4", arrow::timestamp(arrow::TimeUnit::MILLI)), |
| arrow::field("f5", arrow::date32()), |
| arrow::field("f6", arrow::decimal128(2, 2))}; |
| auto schema = arrow::schema(fields); |
| |
| auto file_format = GetParam(); |
| std::map<std::string, std::string> options = { |
| {Options::MANIFEST_FORMAT, "orc"}, {Options::FILE_FORMAT, file_format}, |
| {Options::TARGET_FILE_SIZE, "1024"}, {Options::BUCKET, "1"}, |
| {Options::BUCKET_KEY, "f5"}, {Options::FILE_SYSTEM, "local"}, |
| }; |
| ASSERT_OK_AND_ASSIGN( |
| auto helper, TestHelper::Create(dir->Str(), schema, /*partition_keys=*/{}, |
| /*primary_keys=*/{}, options, /*is_streaming_mode=*/true)); |
| int64_t commit_identifier = 0; |
| |
| std::string data_1 = R"([ |
| [[[0, 0]], [0.1, 0.2], [true, 2], "1970-01-01 00:02:03.123", 2456, "0.22"], |
| [[[0, 1]], [0.1, 0.3], [true, 1], "1970-01-01 00:02:03.999", 24, "0.28"], |
| [[[10, 10]], [1.1, 1.2], [false, 12], "1970-01-01 00:02:03.123", 2456, "0.22"], |
| [[[127, 32767], [-128, -32768]], [1.1, 1.2], [false, 2222], "1970-01-01 00:02:03.123", 245, "0.12"], |
| [[[1, 64], [2, 32]], [2.2, 3.2], [true, 2], "1970-01-01 00:00:00.0", 24, "0.78"], |
| [[[11, 64], [12, 32]], [2.2, 3.2], [true, 2], "1970-01-01 00:00:00.123", 24, "0.78"] |
| ])"; |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatch> batch_1, |
| TestHelper::MakeRecordBatch( |
| arrow::struct_(fields), data_1, /*partition_map=*/{}, /*bucket=*/0, |
| {RecordBatch::RowKind::INSERT, RecordBatch::RowKind::INSERT, |
| RecordBatch::RowKind::INSERT, RecordBatch::RowKind::INSERT, |
| RecordBatch::RowKind::INSERT, RecordBatch::RowKind::INSERT})); |
| |
| SimpleStats value_stats = BinaryRowGenerator::GenerateStats( |
| {NullType(), NullType(), NullType(), TimestampType(Timestamp(0, 0), 3), |
| static_cast<int32_t>(24), Decimal(2, 2, DecimalUtils::StrToInt128("12").value())}, |
| {NullType(), NullType(), NullType(), TimestampType(Timestamp(123999, 000000), 3), |
| static_cast<int32_t>(2456), Decimal(2, 2, DecimalUtils::StrToInt128("78").value())}, |
| file_format == "parquet" ? std::vector<int64_t>({-1, -1, -1, 0, 0, 0}) |
| : std::vector<int64_t>({0, 0, 0, 0, 0, 0}), |
| pool_.get()); |
| |
| auto file_meta = std::make_shared<DataFileMeta>( |
| "data-xxx.xxx", /*file_size=*/543, |
| /*row_count=*/6, /*min_key=*/BinaryRow::EmptyRow(), |
| /*max_key=*/BinaryRow::EmptyRow(), |
| /*key_stats=*/SimpleStats::EmptyStats(), value_stats, |
| /*min_sequence_number=*/0, /*max_sequence_number=*/5, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1724090888706ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, |
| /*first_row_id=*/std::nullopt, |
| /*write_cols=*/std::nullopt); |
| file_meta = ReconstructDataFileMeta(file_meta); |
| DataIncrement data_increment({file_meta}, {}, {}); |
| std::shared_ptr<CommitMessage> expected_commit_message_1 = std::make_shared<CommitMessageImpl>( |
| BinaryRow::EmptyRow(), /*bucket=*/0, |
| /*total_bucket=*/1, data_increment, CompactIncrement({}, {}, {})); |
| std::vector<std::shared_ptr<CommitMessage>> expected_commit_messages_1 = { |
| expected_commit_message_1}; |
| |
| ASSERT_OK_AND_ASSIGN(auto commit_msgs, |
| helper->WriteAndCommit(std::move(batch_1), commit_identifier++, |
| expected_commit_messages_1)); |
| |
| ASSERT_OK_AND_ASSIGN(std::optional<Snapshot> snapshot1, helper->LatestSnapshot()); |
| ASSERT_TRUE(snapshot1); |
| ASSERT_EQ(1, snapshot1.value().Id()); |
| ASSERT_EQ(6, snapshot1.value().TotalRecordCount().value()); |
| ASSERT_EQ(6, snapshot1.value().DeltaRecordCount().value()); |
| |
| arrow::FieldVector fields_with_row_kind = fields; |
| fields_with_row_kind.insert(fields_with_row_kind.begin(), |
| arrow::field("_VALUE_KIND", arrow::int8())); |
| auto data_type = arrow::struct_(fields_with_row_kind); |
| ASSERT_OK_AND_ASSIGN(std::vector<std::shared_ptr<Split>> data_splits_1, |
| helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt)); |
| ASSERT_EQ(data_splits_1.size(), 1); |
| std::string expected_data_1 = R"([ |
| [0, [[0, 0]], [0.1, 0.2], [true, 2], "1970-01-01 00:02:03.123", 2456, "0.22"], |
| [0, [[0, 1]], [0.1, 0.3], [true, 1], "1970-01-01 00:02:03.999", 24, "0.28"], |
| [0, [[10, 10]], [1.1, 1.2], [false, 12], "1970-01-01 00:02:03.123", 2456, "0.22"], |
| [0, [[127, 32767], [-128, -32768]], [1.1, 1.2], [false, 2222], "1970-01-01 00:02:03.123", 245, "0.12"], |
| [0, [[1, 64], [2, 32]], [2.2, 3.2], [true, 2], "1970-01-01 00:00:00.0", 24, "0.78"], |
| [0, [[11, 64], [12, 32]], [2.2, 3.2], [true, 2], "1970-01-01 00:00:00.123", 24, "0.78"] |
| ])"; |
| |
| ASSERT_OK_AND_ASSIGN(bool success, |
| helper->ReadAndCheckResult(data_type, data_splits_1, expected_data_1)); |
| ASSERT_TRUE(success); |
| |
| std::string data_2 = R"([ |
| [[[10, 11]], [1.1, 1.3], [true, 1], "1970-01-01 00:02:03.999", 24, "0.28"], |
| [[[127, 32767], [-128, -32768]], [1.1, 1.2], [false, 2222], "1970-01-01 00:02:03.123", 245, "0.12"], |
| [[[10, 20], [20, 30]], [2.0, 3.0], [true, 2], "1970-01-01 00:00:00.0", 24, "0.78"], |
| [[[11, 61], [21, 31]], [2.1, 3.1], [false, 1], "1970-01-01 00:00:00.123", 24, "0.78"] |
| ])"; |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatch> batch_2, |
| TestHelper::MakeRecordBatch( |
| arrow::struct_(fields), data_2, /*partition_map=*/{}, /*bucket=*/0, |
| {RecordBatch::RowKind::INSERT, RecordBatch::RowKind::INSERT, |
| RecordBatch::RowKind::INSERT, RecordBatch::RowKind::INSERT})); |
| |
| SimpleStats value_stats_2 = value_stats; |
| value_stats_2.max_values_.SetInt(4, 245); |
| auto file_meta_2 = std::make_shared<DataFileMeta>( |
| "data-xxx.xxx", /*file_size=*/543, |
| /*row_count=*/4, /*min_key=*/BinaryRow::EmptyRow(), |
| /*max_key=*/BinaryRow::EmptyRow(), |
| /*key_stats=*/SimpleStats::EmptyStats(), value_stats_2, |
| /*min_sequence_number=*/6, /*max_sequence_number=*/9, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1724090888706ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, |
| /*first_row_id=*/std::nullopt, |
| /*write_cols=*/std::nullopt); |
| file_meta_2 = ReconstructDataFileMeta(file_meta_2); |
| DataIncrement data_increment_2({file_meta_2}, {}, {}); |
| std::shared_ptr<CommitMessage> expected_commit_message_2 = std::make_shared<CommitMessageImpl>( |
| BinaryRow::EmptyRow(), /*bucket=*/0, |
| /*total_bucket=*/1, data_increment_2, CompactIncrement({}, {}, {})); |
| std::vector<std::shared_ptr<CommitMessage>> expected_commit_messages_2 = { |
| expected_commit_message_2}; |
| |
| ASSERT_OK_AND_ASSIGN(auto commit_msgs_2, |
| helper->WriteAndCommit(std::move(batch_2), commit_identifier++, |
| expected_commit_messages_2)); |
| ASSERT_OK_AND_ASSIGN(std::optional<Snapshot> snapshot2, helper->LatestSnapshot()); |
| ASSERT_TRUE(snapshot2); |
| ASSERT_EQ(2, snapshot2.value().Id()); |
| ASSERT_EQ(10, snapshot2.value().TotalRecordCount().value()); |
| ASSERT_EQ(4, snapshot2.value().DeltaRecordCount().value()); |
| |
| ASSERT_OK_AND_ASSIGN(std::vector<std::shared_ptr<Split>> data_splits_2, helper->Scan()); |
| ASSERT_EQ(data_splits_2.size(), 1); |
| std::string expected_data_2 = R"([ |
| [0, [[10, 11]], [1.1, 1.3], [true, 1], "1970-01-01 00:02:03.999", 24, "0.28"], |
| [0, [[127, 32767], [-128, -32768]], [1.1, 1.2], [false, 2222], "1970-01-01 00:02:03.123", 245, "0.12"], |
| [0, [[10, 20], [20, 30]], [2.0, 3.0], [true, 2], "1970-01-01 00:00:00.0", 24, "0.78"], |
| [0, [[11, 61], [21, 31]], [2.1, 3.1], [false, 1], "1970-01-01 00:00:00.123", 24, "0.78"] |
| ])"; |
| ASSERT_OK_AND_ASSIGN(success, |
| helper->ReadAndCheckResult(data_type, data_splits_2, expected_data_2)); |
| ASSERT_TRUE(success); |
| } |
| |
| TEST_P(WriteInteTest, TestPkTableStreamWrite) { |
| auto dir = UniqueTestDirectory::Create(); |
| ASSERT_TRUE(dir); |
| arrow::FieldVector fields = { |
| arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::utf8()), |
| arrow::field("f2", arrow::int32()), arrow::field("f3", arrow::float64())}; |
| auto schema = arrow::schema(fields); |
| std::vector<std::string> primary_keys = {"f0", "f1"}; |
| std::vector<std::string> partition_keys = {"f1"}; |
| auto file_format = GetParam(); |
| std::map<std::string, std::string> options = { |
| {Options::MANIFEST_FORMAT, "orc"}, {Options::FILE_FORMAT, file_format}, |
| {Options::TARGET_FILE_SIZE, "1024"}, {Options::BUCKET, "2"}, |
| {Options::BUCKET_KEY, "f0"}, {Options::FILE_SYSTEM, "local"}, |
| }; |
| ASSERT_OK_AND_ASSIGN( |
| auto helper, TestHelper::Create(dir->Str(), schema, partition_keys, primary_keys, options, |
| /*is_streaming_mode=*/true)); |
| int64_t commit_identifier = 0; |
| ASSERT_OK_AND_ASSIGN(std::optional<std::shared_ptr<TableSchema>> table_schema, |
| helper->LatestSchema()); |
| ASSERT_TRUE(table_schema); |
| |
| // round 1 write |
| DataGenerator gen(table_schema.value(), pool_); |
| std::vector<BinaryRow> datas_1; |
| datas_1.push_back(MakeBinaryRow(RowKind::Insert(), "Alex", "20250326", 18, 10.1)); |
| datas_1.push_back(MakeBinaryRow(RowKind::Insert(), "Bob", "20250326", 19, 11.1)); |
| datas_1.push_back(MakeBinaryRow(RowKind::Insert(), "Cathy", "20250325", 20, 12.1)); |
| datas_1.push_back(MakeBinaryRow(RowKind::Insert(), "David", "20250325", 21, 13.1)); |
| datas_1.push_back(MakeBinaryRow(RowKind::Insert(), "Evan", "20250326", 22, 14.1)); |
| datas_1.push_back(MakeBinaryRow(RowKind::Delete(), "Alex", "20250326", 18, 10.1)); |
| datas_1.push_back(MakeBinaryRow(RowKind::Delete(), "Bob", "20250326", 19, 11.1)); |
| ASSERT_OK_AND_ASSIGN(auto batches_1, gen.SplitArrayByPartitionAndBucket(datas_1)); |
| ASSERT_EQ(3, batches_1.size()); |
| |
| auto file_meta_1 = std::make_shared<DataFileMeta>( |
| "data-xxx.xxx", /*file_size=*/543, |
| /*row_count=*/1, |
| /*min_key=*/BinaryRowGenerator::GenerateRow({std::string("David")}, pool_.get()), |
| /*max_key=*/BinaryRowGenerator::GenerateRow({std::string("David")}, pool_.get()), |
| /*key_stats=*/ |
| BinaryRowGenerator::GenerateStats({std::string("David")}, {std::string("David")}, {0}, |
| pool_.get()), |
| /*value_stats=*/ |
| BinaryRowGenerator::GenerateStats({std::string("David"), std::string("20250325"), 21, 13.1}, |
| {std::string("David"), std::string("20250325"), 21, 13.1}, |
| {0, 0, 0, 0}, pool_.get()), |
| /*min_sequence_number=*/0, /*max_sequence_number=*/0, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1724090888706ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, |
| /*first_row_id=*/std::nullopt, |
| /*write_cols=*/std::nullopt); |
| file_meta_1 = ReconstructDataFileMeta(file_meta_1); |
| DataIncrement data_increment_1({file_meta_1}, {}, {}); |
| std::shared_ptr<CommitMessage> expected_commit_message_1 = std::make_shared<CommitMessageImpl>( |
| /*partition_map=*/BinaryRowGenerator::GenerateRow({std::string("20250325")}, pool_.get()), |
| /*bucket=*/0, |
| /*total_bucket=*/2, data_increment_1, CompactIncrement({}, {}, {})); |
| |
| auto file_meta_2 = std::make_shared<DataFileMeta>( |
| "data-xxx.xxx", /*file_size=*/543, |
| /*row_count=*/1, |
| /*min_key=*/BinaryRowGenerator::GenerateRow({std::string("Cathy")}, pool_.get()), |
| /*max_key=*/BinaryRowGenerator::GenerateRow({std::string("Cathy")}, pool_.get()), |
| /*key_stats=*/ |
| BinaryRowGenerator::GenerateStats({std::string("Cathy")}, {std::string("Cathy")}, {0}, |
| pool_.get()), |
| /*value_stats=*/ |
| BinaryRowGenerator::GenerateStats({std::string("Cathy"), std::string("20250325"), 20, 12.1}, |
| {std::string("Cathy"), std::string("20250325"), 20, 12.1}, |
| {0, 0, 0, 0}, pool_.get()), |
| /*min_sequence_number=*/0, /*max_sequence_number=*/0, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1724090888706ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, |
| /*first_row_id=*/std::nullopt, |
| /*write_cols=*/std::nullopt); |
| file_meta_2 = ReconstructDataFileMeta(file_meta_2); |
| DataIncrement data_increment_2({file_meta_2}, {}, {}); |
| std::shared_ptr<CommitMessage> expected_commit_message_2 = std::make_shared<CommitMessageImpl>( |
| /*partition_map=*/BinaryRowGenerator::GenerateRow({std::string("20250325")}, pool_.get()), |
| /*bucket=*/1, |
| /*total_bucket=*/2, data_increment_2, CompactIncrement({}, {}, {})); |
| |
| auto file_meta_3 = std::make_shared<DataFileMeta>( |
| "data-xxx.xxx", /*file_size=*/543, |
| /*row_count=*/3, |
| /*min_key=*/BinaryRowGenerator::GenerateRow({std::string("Alex")}, pool_.get()), |
| /*max_key=*/BinaryRowGenerator::GenerateRow({std::string("Evan")}, pool_.get()), |
| /*key_stats=*/ |
| BinaryRowGenerator::GenerateStats({std::string("Alex")}, {std::string("Evan")}, {0}, |
| pool_.get()), |
| /*value_stats=*/ |
| BinaryRowGenerator::GenerateStats({std::string("Alex"), std::string("20250326"), 18, 10.1}, |
| {std::string("Evan"), std::string("20250326"), 22, 14.1}, |
| {0, 0, 0, 0}, pool_.get()), |
| /*min_sequence_number=*/2, /*max_sequence_number=*/4, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1724090888706ll, 0), |
| /*delete_row_count=*/2, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, |
| /*first_row_id=*/std::nullopt, |
| /*write_cols=*/std::nullopt); |
| file_meta_3 = ReconstructDataFileMeta(file_meta_3); |
| DataIncrement data_increment_3({file_meta_3}, {}, {}); |
| std::shared_ptr<CommitMessage> expected_commit_message_3 = std::make_shared<CommitMessageImpl>( |
| /*partition_map=*/BinaryRowGenerator::GenerateRow({std::string("20250326")}, pool_.get()), |
| /*bucket=*/1, |
| /*total_bucket=*/2, data_increment_3, CompactIncrement({}, {}, {})); |
| |
| std::vector<std::shared_ptr<CommitMessage>> expected_commit_messages_1 = { |
| expected_commit_message_1, expected_commit_message_2, expected_commit_message_3}; |
| ASSERT_OK_AND_ASSIGN(auto commit_msgs, |
| helper->WriteAndCommit(std::move(batches_1), commit_identifier++, |
| expected_commit_messages_1)); |
| ASSERT_OK_AND_ASSIGN(std::optional<Snapshot> snapshot1, helper->LatestSnapshot()); |
| ASSERT_TRUE(snapshot1); |
| ASSERT_EQ(1, snapshot1.value().Id()); |
| ASSERT_EQ(5, snapshot1.value().TotalRecordCount().value()); |
| ASSERT_EQ(5, snapshot1.value().DeltaRecordCount().value()); |
| |
| // round 1 read |
| arrow::FieldVector fields_with_row_kind = fields; |
| fields_with_row_kind.insert(fields_with_row_kind.begin(), |
| arrow::field("_VALUE_KIND", arrow::int8())); |
| auto data_type = arrow::struct_(fields_with_row_kind); |
| |
| ASSERT_OK_AND_ASSIGN(std::vector<std::shared_ptr<Split>> data_splits_1, |
| helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt)); |
| ASSERT_EQ(data_splits_1.size(), 3); |
| |
| std::map<std::pair<std::string, int32_t>, std::string> expected_datas_1; |
| expected_datas_1[std::make_pair("f1=20250325/", 0)] = R"([[0, "David", "20250325", 21, 13.1]])"; |
| expected_datas_1[std::make_pair("f1=20250325/", 1)] = R"([[0, "Cathy", "20250325", 20, 12.1]])"; |
| expected_datas_1[std::make_pair("f1=20250326/", 1)] = R"([[0, "Evan", "20250326", 22, 14.1]])"; |
| |
| for (const auto& split : data_splits_1) { |
| auto split_impl = dynamic_cast<DataSplitImpl*>(split.get()); |
| ASSERT_OK_AND_ASSIGN(std::string partition_str, |
| helper->PartitionStr(split_impl->Partition())); |
| auto iter = expected_datas_1.find(std::make_pair(partition_str, split_impl->Bucket())); |
| ASSERT_TRUE(iter != expected_datas_1.end()); |
| ASSERT_OK_AND_ASSIGN(bool success, |
| helper->ReadAndCheckResult(data_type, {split}, iter->second)); |
| ASSERT_TRUE(success); |
| } |
| |
| // round 2 write |
| std::vector<BinaryRow> datas_2; |
| datas_2.push_back(MakeBinaryRow(RowKind::Insert(), "Farm", "20250326", 15, 22.1)); |
| datas_2.push_back(MakeBinaryRow(RowKind::Insert(), "Go", "20250325", 22, 23.1)); |
| datas_2.push_back(MakeBinaryRow(RowKind::UpdateAfter(), "David", "20250325", 22, 24.1)); |
| datas_2.push_back(MakeBinaryRow(RowKind::Insert(), "Hi", "20250325", 23, 24.1)); |
| ASSERT_OK_AND_ASSIGN(auto batches_2, gen.SplitArrayByPartitionAndBucket(datas_2)); |
| ASSERT_EQ(3, batches_2.size()); |
| |
| auto file_meta_4 = std::make_shared<DataFileMeta>( |
| "data-xxx.xxx", /*file_size=*/543, |
| /*row_count=*/1, |
| /*min_key=*/BinaryRowGenerator::GenerateRow({std::string("David")}, pool_.get()), |
| /*max_key=*/BinaryRowGenerator::GenerateRow({std::string("David")}, pool_.get()), |
| /*key_stats=*/ |
| BinaryRowGenerator::GenerateStats({std::string("David")}, {std::string("David")}, {0}, |
| pool_.get()), |
| /*value_stats=*/ |
| BinaryRowGenerator::GenerateStats({std::string("David"), std::string("20250325"), 22, 24.1}, |
| {std::string("David"), std::string("20250325"), 22, 24.1}, |
| {0, 0, 0, 0}, pool_.get()), |
| /*min_sequence_number=*/1, /*max_sequence_number=*/1, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1724090888706ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, |
| /*first_row_id=*/std::nullopt, |
| /*write_cols=*/std::nullopt); |
| file_meta_4 = ReconstructDataFileMeta(file_meta_4); |
| DataIncrement data_increment_4({file_meta_4}, {}, {}); |
| std::shared_ptr<CommitMessage> expected_commit_message_4 = std::make_shared<CommitMessageImpl>( |
| /*partition_map=*/BinaryRowGenerator::GenerateRow({std::string("20250325")}, pool_.get()), |
| /*bucket=*/0, |
| /*total_bucket=*/2, data_increment_4, CompactIncrement({}, {}, {})); |
| |
| auto file_meta_5 = std::make_shared<DataFileMeta>( |
| "data-xxx.xxx", /*file_size=*/543, |
| /*row_count=*/2, |
| /*min_key=*/BinaryRowGenerator::GenerateRow({std::string("Go")}, pool_.get()), |
| /*max_key=*/BinaryRowGenerator::GenerateRow({std::string("Hi")}, pool_.get()), |
| /*key_stats=*/ |
| BinaryRowGenerator::GenerateStats({std::string("Go")}, {std::string("Hi")}, {0}, |
| pool_.get()), |
| /*value_stats=*/ |
| BinaryRowGenerator::GenerateStats({std::string("Go"), std::string("20250325"), 22, 23.1}, |
| {std::string("Hi"), std::string("20250325"), 23, 24.1}, |
| {0, 0, 0, 0}, pool_.get()), |
| /*min_sequence_number=*/1, /*max_sequence_number=*/2, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1724090888706ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, |
| /*first_row_id=*/std::nullopt, |
| /*write_cols=*/std::nullopt); |
| file_meta_5 = ReconstructDataFileMeta(file_meta_5); |
| DataIncrement data_increment_5({file_meta_5}, {}, {}); |
| std::shared_ptr<CommitMessage> expected_commit_message_5 = std::make_shared<CommitMessageImpl>( |
| /*partition_map=*/BinaryRowGenerator::GenerateRow({std::string("20250325")}, pool_.get()), |
| /*bucket=*/1, |
| /*total_bucket=*/2, data_increment_5, CompactIncrement({}, {}, {})); |
| |
| auto file_meta_6 = std::make_shared<DataFileMeta>( |
| "data-xxx.xxx", /*file_size=*/543, |
| /*row_count=*/1, |
| /*min_key=*/BinaryRowGenerator::GenerateRow({std::string("Farm")}, pool_.get()), |
| /*max_key=*/BinaryRowGenerator::GenerateRow({std::string("Farm")}, pool_.get()), |
| /*key_stats=*/ |
| BinaryRowGenerator::GenerateStats({std::string("Farm")}, {std::string("Farm")}, {0}, |
| pool_.get()), |
| /*value_stats=*/ |
| BinaryRowGenerator::GenerateStats({std::string("Farm"), std::string("20250326"), 15, 22.1}, |
| {std::string("Farm"), std::string("20250326"), 15, 22.1}, |
| {0, 0, 0, 0}, pool_.get()), |
| /*min_sequence_number=*/0, /*max_sequence_number=*/0, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1724090888706ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, |
| /*first_row_id=*/std::nullopt, |
| /*write_cols=*/std::nullopt); |
| file_meta_6 = ReconstructDataFileMeta(file_meta_6); |
| DataIncrement data_increment_6({file_meta_6}, {}, {}); |
| std::shared_ptr<CommitMessage> expected_commit_message_6 = std::make_shared<CommitMessageImpl>( |
| /*partition_map=*/BinaryRowGenerator::GenerateRow({std::string("20250326")}, pool_.get()), |
| /*bucket=*/0, |
| /*total_bucket=*/2, data_increment_6, CompactIncrement({}, {}, {})); |
| |
| std::shared_ptr<CommitMessage> expected_commit_message_7 = std::make_shared<CommitMessageImpl>( |
| /*partition_map=*/BinaryRowGenerator::GenerateRow({std::string("20250326")}, pool_.get()), |
| /*bucket=*/1, |
| /*total_bucket=*/2, DataIncrement({}, {}, {}), CompactIncrement({}, {}, {})); |
| |
| std::vector<std::shared_ptr<CommitMessage>> expected_commit_messages_2 = { |
| expected_commit_message_4, expected_commit_message_5, expected_commit_message_6, |
| expected_commit_message_7}; |
| ASSERT_OK_AND_ASSIGN(auto commit_msgs_2, |
| helper->WriteAndCommit(std::move(batches_2), commit_identifier++, |
| expected_commit_messages_2)); |
| ASSERT_OK_AND_ASSIGN(std::optional<Snapshot> snapshot2, helper->LatestSnapshot()); |
| ASSERT_TRUE(snapshot2); |
| ASSERT_EQ(2, snapshot2.value().Id()); |
| ASSERT_EQ(9, snapshot2.value().TotalRecordCount().value()); |
| ASSERT_EQ(4, snapshot2.value().DeltaRecordCount().value()); |
| |
| // round 2 read |
| ASSERT_OK_AND_ASSIGN(std::vector<std::shared_ptr<Split>> data_splits_2, helper->Scan()); |
| ASSERT_EQ(data_splits_2.size(), 3); |
| std::map<std::pair<std::string, int32_t>, std::string> expected_datas_2; |
| expected_datas_2[std::make_pair("f1=20250326/", 0)] = R"([[0, "Farm", "20250326", 15, 22.1]])"; |
| expected_datas_2[std::make_pair("f1=20250325/", 0)] = R"([[2, "David", "20250325", 22, 24.1]])"; |
| expected_datas_2[std::make_pair("f1=20250325/", 1)] = |
| R"([[0, "Go", "20250325", 22, 23.1], [0, "Hi", "20250325", 23, 24.1]])"; |
| |
| for (const auto& split : data_splits_2) { |
| auto split_impl = dynamic_cast<DataSplitImpl*>(split.get()); |
| ASSERT_OK_AND_ASSIGN(std::string partition_str, |
| helper->PartitionStr(split_impl->Partition())); |
| auto iter = expected_datas_2.find(std::make_pair(partition_str, split_impl->Bucket())); |
| ASSERT_TRUE(iter != expected_datas_2.end()) << partition_str << " " << split_impl->Bucket(); |
| ASSERT_OK_AND_ASSIGN(bool success, |
| helper->ReadAndCheckResult(data_type, {split}, iter->second)); |
| EXPECT_TRUE(success); |
| } |
| } |
| |
| TEST_P(WriteInteTest, TestPkTableBatchWrite) { |
| auto dir = UniqueTestDirectory::Create(); |
| ASSERT_TRUE(dir); |
| arrow::FieldVector fields = { |
| arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::utf8()), |
| arrow::field("f2", arrow::int32()), arrow::field("f3", arrow::float64())}; |
| auto schema = arrow::schema(fields); |
| std::vector<std::string> primary_keys = {"f0", "f1"}; |
| std::vector<std::string> partition_keys = {"f1"}; |
| auto file_format = GetParam(); |
| std::map<std::string, std::string> options = { |
| {Options::MANIFEST_FORMAT, "orc"}, {Options::FILE_FORMAT, file_format}, |
| {Options::TARGET_FILE_SIZE, "1024"}, {Options::BUCKET, "2"}, |
| {Options::BUCKET_KEY, "f0"}, {Options::FILE_SYSTEM, "local"}, |
| }; |
| ASSERT_OK_AND_ASSIGN( |
| auto helper, TestHelper::Create(dir->Str(), schema, partition_keys, primary_keys, options, |
| /*is_streaming_mode=*/false)); |
| int64_t commit_identifier = 0; |
| ASSERT_OK_AND_ASSIGN(std::optional<std::shared_ptr<TableSchema>> table_schema, |
| helper->LatestSchema()); |
| ASSERT_TRUE(table_schema); |
| |
| DataGenerator gen(table_schema.value(), pool_); |
| std::vector<BinaryRow> datas_1; |
| datas_1.push_back(MakeBinaryRow(RowKind::Insert(), "Alex", "20250326", 18, 10.1)); |
| datas_1.push_back(MakeBinaryRow(RowKind::Insert(), "Bob", "20250326", 19, 11.1)); |
| datas_1.push_back(MakeBinaryRow(RowKind::Insert(), "Cathy", "20250325", 20, 12.1)); |
| datas_1.push_back(MakeBinaryRow(RowKind::Insert(), "David", "20250325", 21, 13.1)); |
| datas_1.push_back(MakeBinaryRow(RowKind::Insert(), "Evan", "20250326", 22, 14.1)); |
| datas_1.push_back(MakeBinaryRow(RowKind::Delete(), "Alex", "20250326", 18, 10.1)); |
| datas_1.push_back(MakeBinaryRow(RowKind::Delete(), "Bob", "20250326", 19, 11.1)); |
| ASSERT_OK_AND_ASSIGN(auto batches_1, gen.SplitArrayByPartitionAndBucket(datas_1)); |
| ASSERT_EQ(3, batches_1.size()); |
| auto file_meta_1 = std::make_shared<DataFileMeta>( |
| "data-xxx.xxx", /*file_size=*/543, |
| /*row_count=*/1, |
| /*min_key=*/BinaryRowGenerator::GenerateRow({std::string("David")}, pool_.get()), |
| /*max_key=*/BinaryRowGenerator::GenerateRow({std::string("David")}, pool_.get()), |
| /*key_stats=*/ |
| BinaryRowGenerator::GenerateStats({std::string("David")}, {std::string("David")}, {0}, |
| pool_.get()), |
| /*value_stats=*/ |
| BinaryRowGenerator::GenerateStats({std::string("David"), std::string("20250325"), 21, 13.1}, |
| {std::string("David"), std::string("20250325"), 21, 13.1}, |
| {0, 0, 0, 0}, pool_.get()), |
| /*min_sequence_number=*/0, /*max_sequence_number=*/0, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1724090888706ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, |
| /*first_row_id=*/std::nullopt, |
| /*write_cols=*/std::nullopt); |
| file_meta_1 = ReconstructDataFileMeta(file_meta_1); |
| DataIncrement data_increment_1({file_meta_1}, {}, {}); |
| std::shared_ptr<CommitMessage> expected_commit_message_1 = std::make_shared<CommitMessageImpl>( |
| /*partition_map=*/BinaryRowGenerator::GenerateRow({std::string("20250325")}, pool_.get()), |
| /*bucket=*/0, |
| /*total_bucket=*/2, data_increment_1, CompactIncrement({}, {}, {})); |
| |
| auto file_meta_2 = std::make_shared<DataFileMeta>( |
| "data-xxx.xxx", /*file_size=*/543, |
| /*row_count=*/1, |
| /*min_key=*/BinaryRowGenerator::GenerateRow({std::string("Cathy")}, pool_.get()), |
| /*max_key=*/BinaryRowGenerator::GenerateRow({std::string("Cathy")}, pool_.get()), |
| /*key_stats=*/ |
| BinaryRowGenerator::GenerateStats({std::string("Cathy")}, {std::string("Cathy")}, {0}, |
| pool_.get()), |
| /*value_stats=*/ |
| BinaryRowGenerator::GenerateStats({std::string("Cathy"), std::string("20250325"), 20, 12.1}, |
| {std::string("Cathy"), std::string("20250325"), 20, 12.1}, |
| {0, 0, 0, 0}, pool_.get()), |
| /*min_sequence_number=*/0, /*max_sequence_number=*/0, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1724090888706ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, |
| /*first_row_id=*/std::nullopt, |
| /*write_cols=*/std::nullopt); |
| file_meta_2 = ReconstructDataFileMeta(file_meta_2); |
| DataIncrement data_increment_2({file_meta_2}, {}, {}); |
| std::shared_ptr<CommitMessage> expected_commit_message_2 = std::make_shared<CommitMessageImpl>( |
| /*partition_map=*/BinaryRowGenerator::GenerateRow({std::string("20250325")}, pool_.get()), |
| /*bucket=*/1, |
| /*total_bucket=*/2, data_increment_2, CompactIncrement({}, {}, {})); |
| |
| auto file_meta_3 = std::make_shared<DataFileMeta>( |
| "data-xxx.xxx", /*file_size=*/543, |
| /*row_count=*/3, |
| /*min_key=*/BinaryRowGenerator::GenerateRow({std::string("Alex")}, pool_.get()), |
| /*max_key=*/BinaryRowGenerator::GenerateRow({std::string("Evan")}, pool_.get()), |
| /*key_stats=*/ |
| BinaryRowGenerator::GenerateStats({std::string("Alex")}, {std::string("Evan")}, {0}, |
| pool_.get()), |
| /*value_stats=*/ |
| BinaryRowGenerator::GenerateStats({std::string("Alex"), std::string("20250326"), 18, 10.1}, |
| {std::string("Evan"), std::string("20250326"), 22, 14.1}, |
| {0, 0, 0, 0}, pool_.get()), |
| /*min_sequence_number=*/2, /*max_sequence_number=*/4, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1724090888706ll, 0), |
| /*delete_row_count=*/2, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, |
| /*first_row_id=*/std::nullopt, |
| /*write_cols=*/std::nullopt); |
| file_meta_3 = ReconstructDataFileMeta(file_meta_3); |
| DataIncrement data_increment_3({file_meta_3}, {}, {}); |
| std::shared_ptr<CommitMessage> expected_commit_message_3 = std::make_shared<CommitMessageImpl>( |
| /*partition_map=*/BinaryRowGenerator::GenerateRow({std::string("20250326")}, pool_.get()), |
| /*bucket=*/1, |
| /*total_bucket=*/2, data_increment_3, CompactIncrement({}, {}, {})); |
| |
| std::vector<std::shared_ptr<CommitMessage>> expected_commit_messages_1 = { |
| expected_commit_message_1, expected_commit_message_2, expected_commit_message_3}; |
| |
| ASSERT_OK_AND_ASSIGN(auto commit_msgs, |
| helper->WriteAndCommit(std::move(batches_1), commit_identifier++, |
| expected_commit_messages_1)); |
| ASSERT_OK_AND_ASSIGN(std::optional<Snapshot> snapshot1, helper->LatestSnapshot()); |
| ASSERT_TRUE(snapshot1); |
| ASSERT_EQ(1, snapshot1.value().Id()); |
| ASSERT_EQ(5, snapshot1.value().TotalRecordCount().value()); |
| ASSERT_EQ(5, snapshot1.value().DeltaRecordCount().value()); |
| |
| ASSERT_OK_AND_ASSIGN(std::vector<std::shared_ptr<Split>> data_splits_1, |
| helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt)); |
| ASSERT_EQ(data_splits_1.size(), 3); |
| |
| arrow::FieldVector fields_with_row_kind = fields; |
| fields_with_row_kind.insert(fields_with_row_kind.begin(), |
| arrow::field("_VALUE_KIND", arrow::int8())); |
| auto data_type = arrow::struct_(fields_with_row_kind); |
| |
| std::map<std::pair<std::string, int32_t>, std::string> expected_datas_1; |
| expected_datas_1[std::make_pair("f1=20250325/", 0)] = R"([[0, "David", "20250325", 21, 13.1]])"; |
| expected_datas_1[std::make_pair("f1=20250325/", 1)] = R"([[0, "Cathy", "20250325", 20, 12.1]])"; |
| expected_datas_1[std::make_pair("f1=20250326/", 1)] = R"([[0, "Evan", "20250326", 22, 14.1]])"; |
| |
| for (const auto& split : data_splits_1) { |
| auto split_impl = dynamic_cast<DataSplitImpl*>(split.get()); |
| ASSERT_OK_AND_ASSIGN(std::string partition_str, |
| helper->PartitionStr(split_impl->Partition())); |
| auto iter = expected_datas_1.find(std::make_pair(partition_str, split_impl->Bucket())); |
| ASSERT_TRUE(iter != expected_datas_1.end()); |
| ASSERT_OK_AND_ASSIGN(bool success, |
| helper->ReadAndCheckResult(data_type, {split}, iter->second)); |
| ASSERT_TRUE(success); |
| } |
| |
| std::vector<BinaryRow> datas_2; |
| datas_2.push_back(MakeBinaryRow(RowKind::Insert(), "Farm", "20250326", 15, 22.1)); |
| datas_2.push_back(MakeBinaryRow(RowKind::Insert(), "Go", "20250325", 22, 23.1)); |
| datas_2.push_back(MakeBinaryRow(RowKind::UpdateAfter(), "David", "20250325", 22, 24.1)); |
| datas_2.push_back(MakeBinaryRow(RowKind::Insert(), "Hi", "20250325", 23, 24.1)); |
| ASSERT_OK_AND_ASSIGN(auto batches_2, gen.SplitArrayByPartitionAndBucket(datas_2)); |
| ASSERT_EQ(3, batches_2.size()); |
| ASSERT_NOK_WITH_MSG(helper->WriteAndCommit(std::move(batches_2), commit_identifier++, |
| /*expected_commit_messages=*/std::nullopt), |
| "batch write mode only support one-time committing."); |
| } |
| |
| TEST_P(WriteInteTest, TestPkTableWriteWithNoPartitionKey) { |
| auto dir = UniqueTestDirectory::Create(); |
| ASSERT_TRUE(dir); |
| arrow::FieldVector fields = { |
| arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::utf8()), |
| arrow::field("f2", arrow::int32()), arrow::field("f3", arrow::float64())}; |
| auto schema = arrow::schema(fields); |
| std::vector<std::string> primary_keys = {"f0", "f1"}; |
| std::vector<std::string> partition_keys = {}; |
| auto file_format = GetParam(); |
| std::map<std::string, std::string> options = { |
| {Options::MANIFEST_FORMAT, "orc"}, {Options::FILE_FORMAT, file_format}, |
| {Options::TARGET_FILE_SIZE, "1024"}, {Options::BUCKET, "2"}, |
| {Options::BUCKET_KEY, "f0"}, {Options::FILE_SYSTEM, "local"}, |
| }; |
| ASSERT_OK_AND_ASSIGN( |
| auto helper, TestHelper::Create(dir->Str(), schema, partition_keys, primary_keys, options, |
| /*is_streaming_mode=*/true)); |
| int64_t commit_identifier = 0; |
| ASSERT_OK_AND_ASSIGN(std::optional<std::shared_ptr<TableSchema>> table_schema, |
| helper->LatestSchema()); |
| ASSERT_TRUE(table_schema); |
| |
| DataGenerator gen(table_schema.value(), pool_); |
| std::vector<BinaryRow> datas_1; |
| datas_1.push_back(MakeBinaryRow(RowKind::Insert(), "Alex", "20250326", 18, 10.1)); |
| datas_1.push_back(MakeBinaryRow(RowKind::Insert(), "Bob", "20250326", 19, 11.1)); |
| datas_1.push_back(MakeBinaryRow(RowKind::Insert(), "Cathy", "20250325", 20, 12.1)); |
| datas_1.push_back(MakeBinaryRow(RowKind::Insert(), "David", "20250325", 21, 13.1)); |
| datas_1.push_back(MakeBinaryRow(RowKind::Insert(), "Evan", "20250326", 22, 14.1)); |
| datas_1.push_back(MakeBinaryRow(RowKind::Delete(), "Alex", "20250326", 18, 10.1)); |
| datas_1.push_back(MakeBinaryRow(RowKind::Delete(), "Bob", "20250326", 19, 11.1)); |
| ASSERT_OK_AND_ASSIGN(auto batches_1, gen.SplitArrayByPartitionAndBucket(datas_1)); |
| ASSERT_EQ(2, batches_1.size()); |
| |
| auto file_meta_1 = std::make_shared<DataFileMeta>( |
| "data-xxx.xxx", /*file_size=*/543, |
| /*row_count=*/1, |
| /*min_key=*/ |
| BinaryRowGenerator::GenerateRow({std::string("David"), std::string("20250325")}, |
| pool_.get()), |
| /*max_key=*/ |
| BinaryRowGenerator::GenerateRow({std::string("David"), std::string("20250325")}, |
| pool_.get()), |
| /*key_stats=*/ |
| BinaryRowGenerator::GenerateStats({std::string("David"), std::string("20250325")}, |
| {std::string("David"), std::string("20250325")}, {0, 0}, |
| pool_.get()), |
| /*value_stats=*/ |
| BinaryRowGenerator::GenerateStats({std::string("David"), std::string("20250325"), 21, 13.1}, |
| {std::string("David"), std::string("20250325"), 21, 13.1}, |
| {0, 0, 0, 0}, pool_.get()), |
| /*min_sequence_number=*/0, /*max_sequence_number=*/0, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1724090888706ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, |
| /*first_row_id=*/std::nullopt, |
| /*write_cols=*/std::nullopt); |
| file_meta_1 = ReconstructDataFileMeta(file_meta_1); |
| DataIncrement data_increment_1({file_meta_1}, {}, {}); |
| std::shared_ptr<CommitMessage> expected_commit_message_1 = std::make_shared<CommitMessageImpl>( |
| BinaryRow::EmptyRow(), /*bucket=*/0, |
| /*total_bucket=*/2, data_increment_1, CompactIncrement({}, {}, {})); |
| |
| auto file_meta_2 = std::make_shared<DataFileMeta>( |
| "data-xxx.xxx", /*file_size=*/543, |
| /*row_count=*/4, |
| /*min_key=*/ |
| BinaryRowGenerator::GenerateRow({std::string("Alex"), std::string("20250326")}, |
| pool_.get()), |
| /*max_key=*/ |
| BinaryRowGenerator::GenerateRow({std::string("Evan"), std::string("20250326")}, |
| pool_.get()), |
| /*key_stats=*/ |
| BinaryRowGenerator::GenerateStats({std::string("Alex"), std::string("20250325")}, |
| {std::string("Evan"), std::string("20250326")}, {0, 0}, |
| pool_.get()), |
| /*value_stats=*/ |
| BinaryRowGenerator::GenerateStats({std::string("Alex"), std::string("20250325"), 18, 10.1}, |
| {std::string("Evan"), std::string("20250326"), 22, 14.1}, |
| {0, 0, 0, 0}, pool_.get()), |
| /*min_sequence_number=*/2, /*max_sequence_number=*/5, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1724090888706ll, 0), |
| /*delete_row_count=*/2, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, |
| /*first_row_id=*/std::nullopt, |
| /*write_cols=*/std::nullopt); |
| file_meta_2 = ReconstructDataFileMeta(file_meta_2); |
| DataIncrement data_increment_2({file_meta_2}, {}, {}); |
| std::shared_ptr<CommitMessage> expected_commit_message_2 = std::make_shared<CommitMessageImpl>( |
| BinaryRow::EmptyRow(), /*bucket=*/1, |
| /*total_bucket=*/2, data_increment_2, CompactIncrement({}, {}, {})); |
| |
| std::vector<std::shared_ptr<CommitMessage>> expected_commit_messages_1 = { |
| expected_commit_message_1, expected_commit_message_2}; |
| |
| ASSERT_OK_AND_ASSIGN(auto commit_msgs, |
| helper->WriteAndCommit(std::move(batches_1), commit_identifier++, |
| expected_commit_messages_1)); |
| ASSERT_OK_AND_ASSIGN(std::optional<Snapshot> snapshot1, helper->LatestSnapshot()); |
| ASSERT_TRUE(snapshot1); |
| ASSERT_EQ(1, snapshot1.value().Id()); |
| ASSERT_EQ(5, snapshot1.value().TotalRecordCount().value()); |
| ASSERT_EQ(5, snapshot1.value().DeltaRecordCount().value()); |
| |
| // round1 read |
| arrow::FieldVector fields_with_row_kind = fields; |
| fields_with_row_kind.insert(fields_with_row_kind.begin(), |
| arrow::field("_VALUE_KIND", arrow::int8())); |
| auto data_type = arrow::struct_(fields_with_row_kind); |
| |
| ASSERT_OK_AND_ASSIGN(std::vector<std::shared_ptr<Split>> data_splits_1, |
| helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt)); |
| ASSERT_EQ(data_splits_1.size(), 2); |
| |
| std::map<int32_t, std::string> expected_datas_1; |
| expected_datas_1[0] = R"([[0, "David", "20250325", 21, 13.1]])"; |
| expected_datas_1[1] = |
| R"([[0, "Cathy", "20250325", 20, 12.1], [0, "Evan", "20250326", 22, 14.1]])"; |
| for (const auto& split : data_splits_1) { |
| auto split_impl = dynamic_cast<DataSplitImpl*>(split.get()); |
| ASSERT_OK_AND_ASSIGN(std::string partition_str, |
| helper->PartitionStr(split_impl->Partition())); |
| auto iter = expected_datas_1.find(split_impl->Bucket()); |
| ASSERT_TRUE(iter != expected_datas_1.end()); |
| ASSERT_OK_AND_ASSIGN(bool success, |
| helper->ReadAndCheckResult(data_type, {split}, iter->second)); |
| ASSERT_TRUE(success); |
| } |
| |
| // round2 |
| std::vector<BinaryRow> datas_2; |
| datas_2.push_back(MakeBinaryRow(RowKind::Insert(), "Farm", "20250326", 15, 22.1)); |
| datas_2.push_back(MakeBinaryRow(RowKind::Insert(), "Go", "20250325", 22, 23.1)); |
| datas_2.push_back(MakeBinaryRow(RowKind::UpdateAfter(), "David", "20250325", 22, 24.1)); |
| datas_2.push_back(MakeBinaryRow(RowKind::Insert(), "Hi", "20250325", 23, 24.1)); |
| ASSERT_OK_AND_ASSIGN(auto batches_2, gen.SplitArrayByPartitionAndBucket(datas_2)); |
| ASSERT_EQ(2, batches_2.size()); |
| |
| auto file_meta_3 = std::make_shared<DataFileMeta>( |
| "data-xxx.xxx", /*file_size=*/543, |
| /*row_count=*/2, |
| /*min_key=*/ |
| BinaryRowGenerator::GenerateRow({std::string("David"), std::string("20250325")}, |
| pool_.get()), |
| /*max_key=*/ |
| BinaryRowGenerator::GenerateRow({std::string("Farm"), std::string("20250326")}, |
| pool_.get()), |
| /*key_stats=*/ |
| BinaryRowGenerator::GenerateStats({std::string("David"), std::string("20250325")}, |
| {std::string("Farm"), std::string("20250326")}, {0, 0}, |
| pool_.get()), |
| /*value_stats=*/ |
| BinaryRowGenerator::GenerateStats({std::string("David"), std::string("20250325"), 15, 22.1}, |
| {std::string("Farm"), std::string("20250326"), 22, 24.1}, |
| {0, 0, 0, 0}, pool_.get()), |
| /*min_sequence_number=*/1, /*max_sequence_number=*/2, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1724090888706ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, |
| /*first_row_id=*/std::nullopt, |
| /*write_cols=*/std::nullopt); |
| file_meta_3 = ReconstructDataFileMeta(file_meta_3); |
| DataIncrement data_increment_3({file_meta_3}, {}, {}); |
| std::shared_ptr<CommitMessage> expected_commit_message_3 = std::make_shared<CommitMessageImpl>( |
| BinaryRow::EmptyRow(), /*bucket=*/0, |
| /*total_bucket=*/2, data_increment_3, CompactIncrement({}, {}, {})); |
| |
| auto file_meta_4 = std::make_shared<DataFileMeta>( |
| "data-xxx.xxx", /*file_size=*/543, |
| /*row_count=*/2, |
| /*min_key=*/ |
| BinaryRowGenerator::GenerateRow({std::string("Go"), std::string("20250325")}, pool_.get()), |
| /*max_key=*/ |
| BinaryRowGenerator::GenerateRow({std::string("Hi"), std::string("20250325")}, pool_.get()), |
| /*key_stats=*/ |
| BinaryRowGenerator::GenerateStats({std::string("Go"), std::string("20250325")}, |
| {std::string("Hi"), std::string("20250325")}, {0, 0}, |
| pool_.get()), |
| /*value_stats=*/ |
| BinaryRowGenerator::GenerateStats({std::string("Go"), std::string("20250325"), 22, 23.1}, |
| {std::string("Hi"), std::string("20250325"), 23, 24.1}, |
| {0, 0, 0, 0}, pool_.get()), |
| /*min_sequence_number=*/6, /*max_sequence_number=*/7, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1724090888706ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, |
| /*first_row_id=*/std::nullopt, |
| /*write_cols=*/std::nullopt); |
| file_meta_4 = ReconstructDataFileMeta(file_meta_4); |
| DataIncrement data_increment_4({file_meta_4}, {}, {}); |
| std::shared_ptr<CommitMessage> expected_commit_message_4 = std::make_shared<CommitMessageImpl>( |
| BinaryRow::EmptyRow(), /*bucket=*/1, |
| /*total_bucket=*/2, data_increment_4, CompactIncrement({}, {}, {})); |
| |
| std::vector<std::shared_ptr<CommitMessage>> expected_commit_messages_2 = { |
| expected_commit_message_3, expected_commit_message_4}; |
| |
| ASSERT_OK_AND_ASSIGN(auto commit_msgs_2, |
| helper->WriteAndCommit(std::move(batches_2), commit_identifier++, |
| expected_commit_messages_2)); |
| ASSERT_OK_AND_ASSIGN(std::optional<Snapshot> snapshot2, helper->LatestSnapshot()); |
| ASSERT_TRUE(snapshot2); |
| ASSERT_EQ(2, snapshot2.value().Id()); |
| ASSERT_EQ(9, snapshot2.value().TotalRecordCount().value()); |
| ASSERT_EQ(4, snapshot2.value().DeltaRecordCount().value()); |
| |
| // round2 read |
| ASSERT_OK_AND_ASSIGN(std::vector<std::shared_ptr<Split>> data_splits_2, helper->Scan()); |
| ASSERT_EQ(data_splits_2.size(), 2); |
| std::map<int32_t, std::string> expected_datas_2; |
| expected_datas_2[0] = |
| R"([[2, "David", "20250325", 22, 24.1],[0, "Farm", "20250326", 15, 22.1]])"; |
| expected_datas_2[1] = R"([[0, "Go", "20250325", 22, 23.1], [0, "Hi", "20250325", 23, 24.1]])"; |
| |
| for (const auto& split : data_splits_2) { |
| auto split_impl = dynamic_cast<DataSplitImpl*>(split.get()); |
| ASSERT_OK_AND_ASSIGN(std::string partition_str, |
| helper->PartitionStr(split_impl->Partition())); |
| auto iter = expected_datas_2.find(split_impl->Bucket()); |
| ASSERT_TRUE(iter != expected_datas_2.end()) << partition_str << " " << split_impl->Bucket(); |
| ASSERT_OK_AND_ASSIGN(bool success, |
| helper->ReadAndCheckResult(data_type, {split}, iter->second)); |
| EXPECT_TRUE(success); |
| } |
| } |
| |
| TEST_P(WriteInteTest, TestPkTableWriteWithComplexType) { |
| if (GetParam() == "lance") { |
| // lance do not support map |
| return; |
| } |
| auto dir = UniqueTestDirectory::Create(); |
| ASSERT_TRUE(dir); |
| arrow::FieldVector fields = { |
| arrow::field("f1", arrow::map(arrow::int8(), arrow::int16())), |
| arrow::field("f2", arrow::list(arrow::float32())), |
| arrow::field("f3", arrow::struct_({arrow::field("f0", arrow::boolean()), |
| arrow::field("f1", arrow::int64())})), |
| arrow::field("f4", arrow::timestamp(arrow::TimeUnit::MILLI)), |
| arrow::field("f5", arrow::date32()), |
| arrow::field("f6", arrow::decimal128(2, 2))}; |
| auto schema = arrow::schema(fields); |
| std::vector<std::string> primary_keys = {"f6", "f4", "f5"}; |
| std::vector<std::string> partition_keys = {}; |
| auto file_format = GetParam(); |
| std::map<std::string, std::string> options = { |
| {Options::MANIFEST_FORMAT, "orc"}, {Options::FILE_FORMAT, file_format}, |
| {Options::TARGET_FILE_SIZE, "1024"}, {Options::BUCKET, "1"}, |
| {Options::BUCKET_KEY, "f5"}, {Options::FILE_SYSTEM, "local"}, |
| }; |
| ASSERT_OK_AND_ASSIGN( |
| auto helper, TestHelper::Create(dir->Str(), schema, partition_keys, primary_keys, options, |
| /*is_streaming_mode=*/true)); |
| int64_t commit_identifier = 0; |
| std::string data_1 = R"([ |
| [[[0, 0]], [0.1, 0.2], [true, 2], "1970-01-01 00:02:03.123", 2456, "0.22"], |
| [[[0, 1]], [0.1, 0.3], [true, 1], "1970-01-01 00:02:03.999", 24, "0.28"], |
| [[[10, 10]], [1.1, 1.2], [false, 12], "1970-01-01 00:02:03.123", 2456, "0.22"], |
| [[[127, 32767], [-128, -32768]], [1.1, 1.2], [false, 2222], "1970-01-01 00:02:03.123", 245, "0.12"], |
| [[[1, 64], [2, 32]], [2.2, 3.2], [true, 2], "1970-01-01 00:00:00.0", 24, "0.78"], |
| [[[11, 64], [12, 32]], [2.2, 3.2], [true, 2], "1970-01-01 00:00:00.123", 24, "0.78"] |
| ])"; |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatch> batch_1, |
| TestHelper::MakeRecordBatch( |
| arrow::struct_(fields), data_1, /*partition_map=*/{}, /*bucket=*/0, |
| {RecordBatch::RowKind::INSERT, RecordBatch::RowKind::INSERT, |
| RecordBatch::RowKind::INSERT, RecordBatch::RowKind::INSERT, |
| RecordBatch::RowKind::INSERT, RecordBatch::RowKind::INSERT})); |
| |
| auto min_key = BinaryRowGenerator::GenerateRow( |
| {Decimal(2, 2, DecimalUtils::StrToInt128("12").value()), |
| TimestampType(Timestamp(123123, 000000), 3), static_cast<int32_t>(245)}, |
| pool_.get()); |
| auto max_key = BinaryRowGenerator::GenerateRow( |
| {Decimal(2, 2, DecimalUtils::StrToInt128("78").value()), |
| TimestampType(Timestamp(123, 000000), 3), static_cast<int32_t>(24)}, |
| pool_.get()); |
| SimpleStats key_stats = BinaryRowGenerator::GenerateStats( |
| {Decimal(2, 2, DecimalUtils::StrToInt128("12").value()), TimestampType(Timestamp(0, 0), 3), |
| static_cast<int32_t>(24)}, |
| {Decimal(2, 2, DecimalUtils::StrToInt128("78").value()), |
| TimestampType(Timestamp(123999, 000000), 3), static_cast<int32_t>(2456)}, |
| std::vector<int64_t>({0, 0, 0}), pool_.get()); |
| SimpleStats value_stats = BinaryRowGenerator::GenerateStats( |
| {NullType(), NullType(), NullType(), TimestampType(Timestamp(0, 0), 3), |
| static_cast<int32_t>(24), Decimal(2, 2, DecimalUtils::StrToInt128("12").value())}, |
| {NullType(), NullType(), NullType(), TimestampType(Timestamp(123999, 000000), 3), |
| static_cast<int32_t>(2456), Decimal(2, 2, DecimalUtils::StrToInt128("78").value())}, |
| file_format == "parquet" ? std::vector<int64_t>({-1, -1, -1, 0, 0, 0}) |
| : std::vector<int64_t>({0, 0, 0, 0, 0, 0}), |
| pool_.get()); |
| |
| auto file_meta = std::make_shared<DataFileMeta>( |
| "data-xxx.xxx", /*file_size=*/543, |
| /*row_count=*/5, min_key, max_key, key_stats, value_stats, |
| /*min_sequence_number=*/1, /*max_sequence_number=*/5, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1724090888706ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, |
| /*first_row_id=*/std::nullopt, |
| /*write_cols=*/std::nullopt); |
| file_meta = ReconstructDataFileMeta(file_meta); |
| DataIncrement data_increment({file_meta}, {}, {}); |
| std::shared_ptr<CommitMessage> expected_commit_message_1 = std::make_shared<CommitMessageImpl>( |
| BinaryRow::EmptyRow(), /*bucket=*/0, |
| /*total_bucket=*/1, data_increment, CompactIncrement({}, {}, {})); |
| std::vector<std::shared_ptr<CommitMessage>> expected_commit_messages_1 = { |
| expected_commit_message_1}; |
| |
| ASSERT_OK_AND_ASSIGN(auto commit_msgs, |
| helper->WriteAndCommit(std::move(batch_1), commit_identifier++, |
| expected_commit_messages_1)); |
| ASSERT_OK_AND_ASSIGN(std::optional<Snapshot> snapshot1, helper->LatestSnapshot()); |
| ASSERT_TRUE(snapshot1); |
| ASSERT_EQ(1, snapshot1.value().Id()); |
| ASSERT_EQ(5, snapshot1.value().TotalRecordCount().value()); |
| ASSERT_EQ(5, snapshot1.value().DeltaRecordCount().value()); |
| |
| arrow::FieldVector fields_with_row_kind = fields; |
| fields_with_row_kind.insert(fields_with_row_kind.begin(), |
| arrow::field("_VALUE_KIND", arrow::int8())); |
| auto data_type = arrow::struct_(fields_with_row_kind); |
| |
| ASSERT_OK_AND_ASSIGN(std::vector<std::shared_ptr<Split>> data_splits_1, |
| helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt)); |
| ASSERT_EQ(data_splits_1.size(), 1); |
| std::string expected_data_1 = R"([ |
| [0, [[127, 32767], [-128, -32768]], [1.1, 1.2], [false, 2222], "1970-01-01 00:02:03.123", 245, "0.12"], |
| [0, [[10, 10]], [1.1, 1.2], [false, 12], "1970-01-01 00:02:03.123", 2456, "0.22"], |
| [0, [[0, 1]], [0.1, 0.3], [true, 1], "1970-01-01 00:02:03.999", 24, "0.28"], |
| [0, [[1, 64], [2, 32]], [2.2, 3.2], [true, 2], "1970-01-01 00:00:00.0", 24, "0.78"], |
| [0, [[11, 64], [12, 32]], [2.2, 3.2], [true, 2], "1970-01-01 00:00:00.123", 24, "0.78"] |
| ])"; |
| ASSERT_OK_AND_ASSIGN(bool success, |
| helper->ReadAndCheckResult(data_type, data_splits_1, expected_data_1)); |
| ASSERT_TRUE(success); |
| |
| std::string data_2 = R"([ |
| [[[10, 11]], [1.1, 1.3], [true, 1], "1970-01-01 00:02:03.999", 24, "0.28"], |
| [[[127, 32767], [-128, -32768]], [1.1, 1.2], [false, 2222], "1970-01-01 00:02:03.123", 245, "0.12"], |
| [[[10, 20], [20, 30]], [2.0, 3.0], [true, 2], "1970-01-01 00:00:00.0", 24, "0.78"], |
| [[[11, 61], [21, 31]], [2.1, 3.1], [false, 1], "1970-01-01 00:00:00.123", 24, "0.78"] |
| ])"; |
| ASSERT_OK_AND_ASSIGN( |
| std::unique_ptr<RecordBatch> batch_2, |
| TestHelper::MakeRecordBatch( |
| arrow::struct_(fields), data_2, /*partition_map=*/{}, /*bucket=*/0, |
| {RecordBatch::RowKind::INSERT, RecordBatch::RowKind::DELETE, |
| RecordBatch::RowKind::UPDATE_BEFORE, RecordBatch::RowKind::UPDATE_AFTER})); |
| |
| auto min_key_2 = BinaryRowGenerator::GenerateRow( |
| {Decimal(2, 2, DecimalUtils::StrToInt128("12").value()), |
| TimestampType(Timestamp(123123, 000000), 3), static_cast<int32_t>(245)}, |
| pool_.get()); |
| auto max_key_2 = BinaryRowGenerator::GenerateRow( |
| {Decimal(2, 2, DecimalUtils::StrToInt128("78").value()), |
| TimestampType(Timestamp(123, 000000), 3), static_cast<int32_t>(24)}, |
| pool_.get()); |
| auto key_stats_2 = key_stats; |
| key_stats_2.max_values_.SetInt(2, 245); |
| |
| auto value_stats_2 = value_stats; |
| value_stats_2.max_values_.SetInt(4, 245); |
| |
| auto file_meta_2 = std::make_shared<DataFileMeta>( |
| "data-xxx.xxx", /*file_size=*/543, |
| /*row_count=*/4, min_key_2, max_key_2, key_stats_2, value_stats_2, |
| /*min_sequence_number=*/6, /*max_sequence_number=*/9, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1724090888706ll, 0), |
| /*delete_row_count=*/2, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, |
| /*first_row_id=*/std::nullopt, |
| /*write_cols=*/std::nullopt); |
| file_meta_2 = ReconstructDataFileMeta(file_meta_2); |
| DataIncrement data_increment_2({file_meta_2}, {}, {}); |
| std::shared_ptr<CommitMessage> expected_commit_message_2 = std::make_shared<CommitMessageImpl>( |
| BinaryRow::EmptyRow(), /*bucket=*/0, |
| /*total_bucket=*/1, data_increment_2, CompactIncrement({}, {}, {})); |
| |
| std::vector<std::shared_ptr<CommitMessage>> expected_commit_messages_2 = { |
| expected_commit_message_2}; |
| |
| ASSERT_OK_AND_ASSIGN(auto commit_msgs_2, |
| helper->WriteAndCommit(std::move(batch_2), commit_identifier++, |
| expected_commit_messages_2)); |
| ASSERT_OK_AND_ASSIGN(std::optional<Snapshot> snapshot2, helper->LatestSnapshot()); |
| ASSERT_TRUE(snapshot2); |
| ASSERT_EQ(2, snapshot2.value().Id()); |
| ASSERT_EQ(9, snapshot2.value().TotalRecordCount().value()); |
| ASSERT_EQ(4, snapshot2.value().DeltaRecordCount().value()); |
| |
| ASSERT_OK_AND_ASSIGN(std::vector<std::shared_ptr<Split>> data_splits_2, helper->Scan()); |
| ASSERT_EQ(data_splits_2.size(), 1); |
| std::string expected_data_2 = R"([ |
| [3, [[127, 32767], [-128, -32768]], [1.1, 1.2], [false, 2222], "1970-01-01 00:02:03.123", 245, "0.12"], |
| [0, [[10, 11]], [1.1, 1.3], [true, 1], "1970-01-01 00:02:03.999", 24, "0.28"], |
| [1, [[10, 20], [20, 30]], [2.0, 3.0], [true, 2], "1970-01-01 00:00:00.0", 24, "0.78"], |
| [2, [[11, 61], [21, 31]], [2.1, 3.1], [false, 1], "1970-01-01 00:00:00.123", 24, "0.78"] |
| ])"; |
| |
| ASSERT_OK_AND_ASSIGN(success, |
| helper->ReadAndCheckResult(data_type, data_splits_2, expected_data_2)); |
| ASSERT_TRUE(success); |
| } |
| |
| TEST_P(WriteInteTest, TestPkTableForceLookup) { |
| auto dir = UniqueTestDirectory::Create(); |
| ASSERT_TRUE(dir); |
| arrow::FieldVector fields = { |
| arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::utf8()), |
| arrow::field("f2", arrow::int32()), arrow::field("f3", arrow::float64())}; |
| auto schema = arrow::schema(fields); |
| std::vector<std::string> primary_keys = {"f0", "f1"}; |
| std::vector<std::string> partition_keys = {}; |
| auto file_format = GetParam(); |
| std::map<std::string, std::string> options = { |
| {Options::MANIFEST_FORMAT, "orc"}, {Options::FILE_FORMAT, file_format}, |
| {Options::TARGET_FILE_SIZE, "1024"}, {Options::BUCKET, "1"}, |
| {Options::BUCKET_KEY, "f0"}, {Options::FILE_SYSTEM, "local"}, |
| {Options::FORCE_LOOKUP, "true"}, |
| }; |
| ASSERT_OK_AND_ASSIGN( |
| auto helper, TestHelper::Create(dir->Str(), schema, partition_keys, primary_keys, options, |
| /*is_streaming_mode=*/true)); |
| int64_t commit_identifier = 0; |
| ASSERT_OK_AND_ASSIGN(std::optional<std::shared_ptr<TableSchema>> table_schema, |
| helper->LatestSchema()); |
| ASSERT_TRUE(table_schema); |
| |
| DataGenerator gen(table_schema.value(), pool_); |
| std::vector<BinaryRow> data; |
| data.push_back(MakeBinaryRow(RowKind::Insert(), "Alex", "20250326", 18, 10.1)); |
| data.push_back(MakeBinaryRow(RowKind::Insert(), "Bob", "20250326", 19, 11.1)); |
| data.push_back(MakeBinaryRow(RowKind::Insert(), "Cathy", "20250325", 20, 12.1)); |
| data.push_back(MakeBinaryRow(RowKind::Insert(), "Evan", "20250326", 22, 14.1)); |
| data.push_back(MakeBinaryRow(RowKind::Delete(), "Alex", "20250326", 18, 10.1)); |
| data.push_back(MakeBinaryRow(RowKind::Delete(), "Bob", "20250326", 19, 11.1)); |
| data.push_back(MakeBinaryRow(RowKind::Insert(), "Cathy", "20250325", 120, 112.1)); |
| ASSERT_OK_AND_ASSIGN(auto batches, gen.SplitArrayByPartitionAndBucket(data)); |
| ASSERT_EQ(1, batches.size()); |
| |
| ASSERT_OK_AND_ASSIGN( |
| auto commit_msgs, |
| helper->WriteAndCommit(std::move(batches), commit_identifier++, std::nullopt)); |
| ASSERT_OK_AND_ASSIGN(std::optional<Snapshot> snapshot1, helper->LatestSnapshot()); |
| ASSERT_TRUE(snapshot1); |
| ASSERT_EQ(1, snapshot1.value().Id()); |
| ASSERT_EQ(4, snapshot1.value().TotalRecordCount().value()); |
| ASSERT_EQ(4, snapshot1.value().DeltaRecordCount().value()); |
| |
| // read |
| arrow::FieldVector fields_with_row_kind = fields; |
| fields_with_row_kind.insert(fields_with_row_kind.begin(), |
| arrow::field("_VALUE_KIND", arrow::int8())); |
| auto data_type = arrow::struct_(fields_with_row_kind); |
| |
| ASSERT_OK_AND_ASSIGN(std::vector<std::shared_ptr<Split>> data_splits, |
| helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt, |
| /*is_streaming=*/false)); |
| ASSERT_EQ(data_splits.size(), 1); |
| std::string expected_data = R"([ |
| [0, "Cathy", "20250325", 120, 112.1], [0, "Evan", "20250326", 22, 14.1] |
| ])"; |
| ASSERT_OK_AND_ASSIGN(bool success, |
| helper->ReadAndCheckResult(data_type, data_splits, expected_data)); |
| ASSERT_TRUE(success); |
| } |
| |
| TEST_P(WriteInteTest, TestPkTableEnableDeletionVector) { |
| auto dir = UniqueTestDirectory::Create(); |
| ASSERT_TRUE(dir); |
| arrow::FieldVector fields = { |
| arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::utf8()), |
| arrow::field("f2", arrow::int32()), arrow::field("f3", arrow::float64())}; |
| auto schema = arrow::schema(fields); |
| std::vector<std::string> primary_keys = {"f0", "f1"}; |
| std::vector<std::string> partition_keys = {}; |
| auto file_format = GetParam(); |
| std::map<std::string, std::string> options = { |
| {Options::MANIFEST_FORMAT, "orc"}, |
| {Options::FILE_FORMAT, file_format}, |
| {Options::TARGET_FILE_SIZE, "1024"}, |
| {Options::BUCKET, "1"}, |
| {Options::BUCKET_KEY, "f0"}, |
| {Options::FILE_SYSTEM, "local"}, |
| {Options::DELETION_VECTORS_ENABLED, "true"}, |
| }; |
| ASSERT_OK_AND_ASSIGN( |
| auto helper, TestHelper::Create(dir->Str(), schema, partition_keys, primary_keys, options, |
| /*is_streaming_mode=*/true)); |
| int64_t commit_identifier = 0; |
| ASSERT_OK_AND_ASSIGN(std::optional<std::shared_ptr<TableSchema>> table_schema, |
| helper->LatestSchema()); |
| ASSERT_TRUE(table_schema); |
| |
| DataGenerator gen(table_schema.value(), pool_); |
| std::vector<BinaryRow> data; |
| data.push_back(MakeBinaryRow(RowKind::Insert(), "Alex", "20250326", 18, 10.1)); |
| data.push_back(MakeBinaryRow(RowKind::Insert(), "Bob", "20250326", 19, 11.1)); |
| data.push_back(MakeBinaryRow(RowKind::Insert(), "Cathy", "20250325", 20, 12.1)); |
| data.push_back(MakeBinaryRow(RowKind::Insert(), "Evan", "20250326", 22, 14.1)); |
| data.push_back(MakeBinaryRow(RowKind::Delete(), "Alex", "20250326", 18, 10.1)); |
| data.push_back(MakeBinaryRow(RowKind::Delete(), "Bob", "20250326", 19, 11.1)); |
| data.push_back(MakeBinaryRow(RowKind::Insert(), "Cathy", "20250325", 120, 112.1)); |
| ASSERT_OK_AND_ASSIGN(auto batches, gen.SplitArrayByPartitionAndBucket(data)); |
| ASSERT_EQ(1, batches.size()); |
| |
| ASSERT_OK_AND_ASSIGN( |
| auto commit_msgs, |
| helper->WriteAndCommit(std::move(batches), commit_identifier++, std::nullopt)); |
| ASSERT_OK_AND_ASSIGN(std::optional<Snapshot> snapshot1, helper->LatestSnapshot()); |
| ASSERT_TRUE(snapshot1); |
| ASSERT_EQ(1, snapshot1.value().Id()); |
| ASSERT_EQ(4, snapshot1.value().TotalRecordCount().value()); |
| ASSERT_EQ(4, snapshot1.value().DeltaRecordCount().value()); |
| |
| // read |
| arrow::FieldVector fields_with_row_kind = fields; |
| fields_with_row_kind.insert(fields_with_row_kind.begin(), |
| arrow::field("_VALUE_KIND", arrow::int8())); |
| auto data_type = arrow::struct_(fields_with_row_kind); |
| |
| ASSERT_OK_AND_ASSIGN(std::vector<std::shared_ptr<Split>> data_splits, |
| helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt, |
| /*is_streaming=*/false)); |
| ASSERT_TRUE(data_splits.empty()); |
| } |
| |
| TEST_P(WriteInteTest, TestPkTableWriteWithIOException) { |
| ::testing::GTEST_FLAG(throw_on_failure) = true; |
| // create table |
| arrow::FieldVector fields = { |
| arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::utf8()), |
| arrow::field("f2", arrow::int32()), arrow::field("f3", arrow::float64())}; |
| auto schema = arrow::schema(fields); |
| std::vector<std::string> primary_keys = {"f0", "f1"}; |
| std::vector<std::string> partition_keys = {"f1"}; |
| auto file_format = GetParam(); |
| std::map<std::string, std::string> options = { |
| {Options::MANIFEST_FORMAT, "orc"}, {Options::FILE_FORMAT, file_format}, |
| {Options::TARGET_FILE_SIZE, "1024"}, {Options::BUCKET, "2"}, |
| {Options::BUCKET_KEY, "f0"}, {Options::FILE_SYSTEM, "local"}, |
| }; |
| bool run_complete = false; |
| auto io_hook = IOHook::GetInstance(); |
| |
| for (size_t i = 0; i < 500; i++) { |
| auto dir = UniqueTestDirectory::Create(); |
| ASSERT_TRUE(dir); |
| ScopeGuard guard([&io_hook]() { io_hook->Clear(); }); |
| io_hook->Reset(i, IOHook::Mode::RETURN_ERROR); |
| ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create(dir->Str(), options)); |
| CHECK_HOOK_STATUS(catalog->CreateDatabase("foo", options, /*ignore_if_exists=*/false), i); |
| ::ArrowSchema c_schema; |
| ScopeGuard arrow_guard([&c_schema]() { ArrowSchemaRelease(&c_schema); }); |
| ASSERT_TRUE(arrow::ExportSchema(*schema, &c_schema).ok()); |
| CHECK_HOOK_STATUS(catalog->CreateTable(Identifier("foo", "bar"), &c_schema, partition_keys, |
| primary_keys, options, /*ignore_if_exists=*/false), |
| i); |
| std::string root_path = PathUtil::JoinPath(dir->Str(), "foo.db/bar"); |
| SchemaManager schema_manger(file_system_, root_path); |
| auto table_schema_result = schema_manger.ReadSchema(/*schema_id=*/0); |
| CHECK_HOOK_STATUS(table_schema_result.status(), i); |
| std::shared_ptr<TableSchema> table_schema = table_schema_result.value(); |
| |
| // prepare data |
| DataGenerator gen(table_schema, pool_); |
| std::vector<BinaryRow> datas_1; |
| datas_1.push_back(MakeBinaryRow(RowKind::Insert(), "Alex", "20250326", 18, 10.1)); |
| datas_1.push_back(MakeBinaryRow(RowKind::Insert(), "Bob", "20250326", 19, 11.1)); |
| datas_1.push_back(MakeBinaryRow(RowKind::Insert(), "Cathy", "20250325", 20, 12.1)); |
| datas_1.push_back(MakeBinaryRow(RowKind::Insert(), "David", "20250325", 21, 13.1)); |
| datas_1.push_back(MakeBinaryRow(RowKind::Insert(), "Evan", "20250326", 22, 14.1)); |
| datas_1.push_back(MakeBinaryRow(RowKind::Delete(), "Alex", "20250326", 18, 10.1)); |
| datas_1.push_back(MakeBinaryRow(RowKind::Delete(), "Bob", "20250326", 19, 11.1)); |
| ASSERT_OK_AND_ASSIGN(auto batches_1, gen.SplitArrayByPartitionAndBucket(datas_1)); |
| ASSERT_EQ(3, batches_1.size()); |
| |
| std::vector<BinaryRow> datas_2; |
| datas_2.push_back(MakeBinaryRow(RowKind::Insert(), "Farm", "20250326", 15, 22.1)); |
| datas_2.push_back(MakeBinaryRow(RowKind::Insert(), "Go", "20250325", 22, 23.1)); |
| datas_2.push_back(MakeBinaryRow(RowKind::UpdateAfter(), "David", "20250325", 22, 24.1)); |
| datas_2.push_back(MakeBinaryRow(RowKind::Insert(), "Hi", "20250325", 23, 24.1)); |
| ASSERT_OK_AND_ASSIGN(auto batches_2, gen.SplitArrayByPartitionAndBucket(datas_2)); |
| ASSERT_EQ(3, batches_2.size()); |
| |
| // write data |
| WriteContextBuilder context_builder(root_path, "commit_user_1"); |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<WriteContext> write_context, |
| context_builder.SetOptions(options).WithStreamingMode(true).Finish()); |
| Result<std::unique_ptr<FileStoreWrite>> write = |
| FileStoreWrite::Create(std::move(write_context)); |
| CHECK_HOOK_STATUS(write.status(), i); |
| auto& file_store_write = write.value(); |
| // round 1 |
| CHECK_HOOK_STATUS(file_store_write->Write(std::move(batches_1[0])), i); |
| CHECK_HOOK_STATUS(file_store_write->Write(std::move(batches_1[1])), i); |
| CHECK_HOOK_STATUS(file_store_write->Write(std::move(batches_1[2])), i); |
| Result<std::vector<std::shared_ptr<CommitMessage>>> results_1 = |
| file_store_write->PrepareCommit(/*wait_compaction=*/false, 0); |
| CHECK_HOOK_STATUS(results_1.status(), i); |
| std::vector<std::shared_ptr<CommitMessage>> results_1_value = results_1.value(); |
| ASSERT_EQ(results_1_value.size(), 3); |
| // round 2 |
| CHECK_HOOK_STATUS(file_store_write->Write(std::move(batches_2[0])), i); |
| CHECK_HOOK_STATUS(file_store_write->Write(std::move(batches_2[1])), i); |
| CHECK_HOOK_STATUS(file_store_write->Write(std::move(batches_2[2])), i); |
| Result<std::vector<std::shared_ptr<CommitMessage>>> results_2 = |
| file_store_write->PrepareCommit(/*wait_compaction=*/false, 1); |
| CHECK_HOOK_STATUS(results_2.status(), i); |
| std::vector<std::shared_ptr<CommitMessage>> results_2_value = results_2.value(); |
| ASSERT_EQ(results_2_value.size(), 4); |
| io_hook->Clear(); |
| |
| std::vector<std::string> subdirs = {"f1=20250325/bucket-0", "f1=20250325/bucket-1", |
| "f1=20250326/bucket-0", "f1=20250326/bucket-1"}; |
| CheckFileCount(root_path, subdirs, /*expect_file_count=*/6); |
| |
| auto file_meta_1 = std::make_shared<DataFileMeta>( |
| "data-xxx.xxx", /*file_size=*/543, |
| /*row_count=*/1, |
| /*min_key=*/BinaryRowGenerator::GenerateRow({std::string("David")}, pool_.get()), |
| /*max_key=*/BinaryRowGenerator::GenerateRow({std::string("David")}, pool_.get()), |
| /*key_stats=*/ |
| BinaryRowGenerator::GenerateStats({std::string("David")}, {std::string("David")}, {0}, |
| pool_.get()), |
| /*value_stats=*/ |
| BinaryRowGenerator::GenerateStats( |
| {std::string("David"), std::string("20250325"), 21, 13.1}, |
| {std::string("David"), std::string("20250325"), 21, 13.1}, {0, 0, 0, 0}, |
| pool_.get()), |
| /*min_sequence_number=*/0, /*max_sequence_number=*/0, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1724090888706ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, |
| /*first_row_id=*/std::nullopt, |
| /*write_cols=*/std::nullopt); |
| file_meta_1 = ReconstructDataFileMeta(file_meta_1); |
| DataIncrement data_increment_1({file_meta_1}, {}, {}); |
| std::shared_ptr<CommitMessage> expected_commit_message_1 = |
| std::make_shared<CommitMessageImpl>( |
| /*partition_map=*/BinaryRowGenerator::GenerateRow({std::string("20250325")}, |
| pool_.get()), |
| /*bucket=*/0, |
| /*total_bucket=*/2, data_increment_1, CompactIncrement({}, {}, {})); |
| |
| auto file_meta_2 = std::make_shared<DataFileMeta>( |
| "data-xxx.xxx", /*file_size=*/543, |
| /*row_count=*/1, |
| /*min_key=*/BinaryRowGenerator::GenerateRow({std::string("Cathy")}, pool_.get()), |
| /*max_key=*/BinaryRowGenerator::GenerateRow({std::string("Cathy")}, pool_.get()), |
| /*key_stats=*/ |
| BinaryRowGenerator::GenerateStats({std::string("Cathy")}, {std::string("Cathy")}, {0}, |
| pool_.get()), |
| /*value_stats=*/ |
| BinaryRowGenerator::GenerateStats( |
| {std::string("Cathy"), std::string("20250325"), 20, 12.1}, |
| {std::string("Cathy"), std::string("20250325"), 20, 12.1}, {0, 0, 0, 0}, |
| pool_.get()), |
| /*min_sequence_number=*/0, /*max_sequence_number=*/0, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1724090888706ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, |
| /*first_row_id=*/std::nullopt, |
| /*write_cols=*/std::nullopt); |
| file_meta_2 = ReconstructDataFileMeta(file_meta_2); |
| DataIncrement data_increment_2({file_meta_2}, {}, {}); |
| std::shared_ptr<CommitMessage> expected_commit_message_2 = |
| std::make_shared<CommitMessageImpl>( |
| /*partition_map=*/BinaryRowGenerator::GenerateRow({std::string("20250325")}, |
| pool_.get()), |
| /*bucket=*/1, |
| /*total_bucket=*/2, data_increment_2, CompactIncrement({}, {}, {})); |
| |
| auto file_meta_3 = std::make_shared<DataFileMeta>( |
| "data-xxx.xxx", /*file_size=*/543, |
| /*row_count=*/3, |
| /*min_key=*/BinaryRowGenerator::GenerateRow({std::string("Alex")}, pool_.get()), |
| /*max_key=*/BinaryRowGenerator::GenerateRow({std::string("Evan")}, pool_.get()), |
| /*key_stats=*/ |
| BinaryRowGenerator::GenerateStats({std::string("Alex")}, {std::string("Evan")}, {0}, |
| pool_.get()), |
| /*value_stats=*/ |
| BinaryRowGenerator::GenerateStats( |
| {std::string("Alex"), std::string("20250326"), 18, 10.1}, |
| {std::string("Evan"), std::string("20250326"), 22, 14.1}, {0, 0, 0, 0}, |
| pool_.get()), |
| /*min_sequence_number=*/2, /*max_sequence_number=*/4, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1724090888706ll, 0), |
| /*delete_row_count=*/2, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, |
| /*first_row_id=*/std::nullopt, |
| /*write_cols=*/std::nullopt); |
| file_meta_3 = ReconstructDataFileMeta(file_meta_3); |
| DataIncrement data_increment_3({file_meta_3}, {}, {}); |
| std::shared_ptr<CommitMessage> expected_commit_message_3 = |
| std::make_shared<CommitMessageImpl>( |
| /*partition_map=*/BinaryRowGenerator::GenerateRow({std::string("20250326")}, |
| pool_.get()), |
| /*bucket=*/1, |
| /*total_bucket=*/2, data_increment_3, CompactIncrement({}, {}, {})); |
| |
| std::vector<std::shared_ptr<CommitMessage>> expected_commit_messages_1 = { |
| expected_commit_message_1, expected_commit_message_2, expected_commit_message_3}; |
| |
| auto file_meta_4 = std::make_shared<DataFileMeta>( |
| "data-xxx.xxx", /*file_size=*/543, |
| /*row_count=*/1, |
| /*min_key=*/BinaryRowGenerator::GenerateRow({std::string("David")}, pool_.get()), |
| /*max_key=*/BinaryRowGenerator::GenerateRow({std::string("David")}, pool_.get()), |
| /*key_stats=*/ |
| BinaryRowGenerator::GenerateStats({std::string("David")}, {std::string("David")}, {0}, |
| pool_.get()), |
| /*value_stats=*/ |
| BinaryRowGenerator::GenerateStats( |
| {std::string("David"), std::string("20250325"), 22, 24.1}, |
| {std::string("David"), std::string("20250325"), 22, 24.1}, {0, 0, 0, 0}, |
| pool_.get()), |
| /*min_sequence_number=*/1, /*max_sequence_number=*/1, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1724090888706ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, |
| /*first_row_id=*/std::nullopt, |
| /*write_cols=*/std::nullopt); |
| file_meta_4 = ReconstructDataFileMeta(file_meta_4); |
| DataIncrement data_increment_4({file_meta_4}, {}, {}); |
| std::shared_ptr<CommitMessage> expected_commit_message_4 = |
| std::make_shared<CommitMessageImpl>( |
| /*partition_map=*/BinaryRowGenerator::GenerateRow({std::string("20250325")}, |
| pool_.get()), |
| /*bucket=*/0, |
| /*total_bucket=*/2, data_increment_4, CompactIncrement({}, {}, {})); |
| |
| auto file_meta_5 = std::make_shared<DataFileMeta>( |
| "data-xxx.xxx", /*file_size=*/543, |
| /*row_count=*/2, |
| /*min_key=*/BinaryRowGenerator::GenerateRow({std::string("Go")}, pool_.get()), |
| /*max_key=*/BinaryRowGenerator::GenerateRow({std::string("Hi")}, pool_.get()), |
| /*key_stats=*/ |
| BinaryRowGenerator::GenerateStats({std::string("Go")}, {std::string("Hi")}, {0}, |
| pool_.get()), |
| /*value_stats=*/ |
| BinaryRowGenerator::GenerateStats( |
| {std::string("Go"), std::string("20250325"), 22, 23.1}, |
| {std::string("Hi"), std::string("20250325"), 23, 24.1}, {0, 0, 0, 0}, pool_.get()), |
| /*min_sequence_number=*/1, /*max_sequence_number=*/2, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1724090888706ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, |
| /*first_row_id=*/std::nullopt, |
| /*write_cols=*/std::nullopt); |
| file_meta_5 = ReconstructDataFileMeta(file_meta_5); |
| DataIncrement data_increment_5({file_meta_5}, {}, {}); |
| std::shared_ptr<CommitMessage> expected_commit_message_5 = |
| std::make_shared<CommitMessageImpl>( |
| /*partition_map=*/BinaryRowGenerator::GenerateRow({std::string("20250325")}, |
| pool_.get()), |
| /*bucket=*/1, |
| /*total_bucket=*/2, data_increment_5, CompactIncrement({}, {}, {})); |
| |
| auto file_meta_6 = std::make_shared<DataFileMeta>( |
| "data-xxx.xxx", /*file_size=*/543, |
| /*row_count=*/1, |
| /*min_key=*/BinaryRowGenerator::GenerateRow({std::string("Farm")}, pool_.get()), |
| /*max_key=*/BinaryRowGenerator::GenerateRow({std::string("Farm")}, pool_.get()), |
| /*key_stats=*/ |
| BinaryRowGenerator::GenerateStats({std::string("Farm")}, {std::string("Farm")}, {0}, |
| pool_.get()), |
| /*value_stats=*/ |
| BinaryRowGenerator::GenerateStats( |
| {std::string("Farm"), std::string("20250326"), 15, 22.1}, |
| {std::string("Farm"), std::string("20250326"), 15, 22.1}, {0, 0, 0, 0}, |
| pool_.get()), |
| /*min_sequence_number=*/0, /*max_sequence_number=*/0, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1724090888706ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, |
| /*first_row_id=*/std::nullopt, |
| /*write_cols=*/std::nullopt); |
| file_meta_6 = ReconstructDataFileMeta(file_meta_6); |
| DataIncrement data_increment_6({file_meta_6}, {}, {}); |
| std::shared_ptr<CommitMessage> expected_commit_message_6 = |
| std::make_shared<CommitMessageImpl>( |
| /*partition_map=*/BinaryRowGenerator::GenerateRow({std::string("20250326")}, |
| pool_.get()), |
| /*bucket=*/0, |
| /*total_bucket=*/2, data_increment_6, CompactIncrement({}, {}, {})); |
| |
| std::shared_ptr<CommitMessage> expected_commit_message_7 = |
| std::make_shared<CommitMessageImpl>( |
| /*partition_map=*/BinaryRowGenerator::GenerateRow({std::string("20250326")}, |
| pool_.get()), |
| /*bucket=*/1, |
| /*total_bucket=*/2, DataIncrement({}, {}, {}), CompactIncrement({}, {}, {})); |
| |
| std::vector<std::shared_ptr<CommitMessage>> expected_commit_messages_2 = { |
| expected_commit_message_4, expected_commit_message_5, expected_commit_message_6, |
| expected_commit_message_7}; |
| |
| TestHelper::CheckCommitMessages(expected_commit_messages_1, results_1_value); |
| TestHelper::CheckCommitMessages(expected_commit_messages_2, results_2_value); |
| run_complete = true; |
| break; |
| } |
| ASSERT_TRUE(run_complete); |
| } |
| |
| TEST_F(WriteInteTest, TestAppendTableWriteWithAlterTable) { |
| std::string test_data_path = |
| paimon::test::GetDataDir() + |
| "/orc/append_table_with_alter_table.db/append_table_with_alter_table/"; |
| auto dir = UniqueTestDirectory::Create(); |
| std::string table_path = dir->Str(); |
| ASSERT_TRUE(TestUtil::CopyDirectory(test_data_path, table_path)); |
| arrow::FieldVector fields = { |
| arrow::field("key0", arrow::int32()), arrow::field("key1", arrow::int32()), |
| arrow::field("k", arrow::int32()), arrow::field("c", arrow::int32()), |
| arrow::field("d", arrow::int32()), arrow::field("a", arrow::int32()), |
| arrow::field("e", arrow::int32()), |
| }; |
| std::map<std::string, std::string> options = {{Options::FILE_FORMAT, "orc"}, |
| {Options::MANIFEST_FORMAT, "orc"}, |
| {Options::TARGET_FILE_SIZE, "1024"}, |
| {Options::FILE_SYSTEM, "local"}}; |
| ASSERT_OK_AND_ASSIGN(auto helper, |
| TestHelper::Create(table_path, options, /*is_streaming_mode=*/true)); |
| // scan with empty split |
| ASSERT_OK_AND_ASSIGN(std::vector<std::shared_ptr<Split>> empty_splits, |
| helper->NewScan(StartupMode::Latest(), /*snapshot_id=*/std::nullopt)); |
| ASSERT_TRUE(empty_splits.empty()); |
| |
| int64_t commit_identifier = 0; |
| auto data_type = arrow::struct_(fields); |
| std::string data = R"([[1, 1, 116, 113, 567, 115, 668]])"; |
| ASSERT_OK_AND_ASSIGN( |
| std::unique_ptr<RecordBatch> batch, |
| TestHelper::MakeRecordBatch(data_type, data, {{"key0", "1"}, {"key1", "1"}}, /*bucket=*/0, |
| /*row_kinds=*/{})); |
| // for append only unaware bucket table, previous files will be ignored |
| auto file_meta = std::make_shared<DataFileMeta>( |
| "data-xxx.xxx", /*file_size=*/543, /*row_count=*/1, |
| /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), |
| /*key_stats=*/SimpleStats::EmptyStats(), |
| BinaryRowGenerator::GenerateStats({1, 1, 116, 113, 567, 115, 668}, |
| {1, 1, 116, 113, 567, 115, 668}, {0, 0, 0, 0, 0, 0, 0}, |
| pool_.get()), |
| /*min_sequence_number=*/0, /*max_sequence_number=*/0, /*schema_id=*/1, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1724090888706ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, |
| /*first_row_id=*/std::nullopt, |
| /*write_cols=*/std::nullopt); |
| DataIncrement data_increment({file_meta}, {}, {}); |
| std::shared_ptr<CommitMessage> expected_commit_message = std::make_shared<CommitMessageImpl>( |
| BinaryRowGenerator::GenerateRow({1, 1}, pool_.get()), /*bucket=*/0, |
| /*total_bucket=*/-1, data_increment, CompactIncrement({}, {}, {})); |
| std::vector<std::shared_ptr<CommitMessage>> expected_commit_messages = { |
| expected_commit_message}; |
| ASSERT_OK( |
| helper->WriteAndCommit(std::move(batch), commit_identifier++, expected_commit_messages)); |
| |
| ASSERT_OK_AND_ASSIGN(std::optional<Snapshot> snapshot, helper->LatestSnapshot()); |
| ASSERT_TRUE(snapshot); |
| |
| ASSERT_EQ(1, snapshot.value().SchemaId()); |
| ASSERT_EQ(3, snapshot.value().Id()); |
| |
| // read |
| ASSERT_OK_AND_ASSIGN(std::vector<std::shared_ptr<Split>> data_splits, helper->Scan()); |
| ASSERT_EQ(data_splits.size(), 1); |
| |
| arrow::FieldVector fields_with_row_kind = fields; |
| fields_with_row_kind.insert(fields_with_row_kind.begin(), |
| arrow::field("_VALUE_KIND", arrow::int8())); |
| auto data_type_with_row_kind = arrow::struct_(fields_with_row_kind); |
| std::string expected_data = R"([[0, 1, 1, 116, 113, 567, 115, 668]])"; |
| ASSERT_OK_AND_ASSIGN(bool success, helper->ReadAndCheckResult(data_type_with_row_kind, |
| data_splits, expected_data)); |
| ASSERT_TRUE(success); |
| } |
| |
| TEST_F(WriteInteTest, TestPKTableWriteWithAlterTable) { |
| std::string test_data_path = |
| paimon::test::GetDataDir() + "/orc/pk_table_with_mor.db/pk_table_with_mor/"; |
| auto dir = UniqueTestDirectory::Create(); |
| std::string table_path = dir->Str(); |
| ASSERT_TRUE(TestUtil::CopyDirectory(test_data_path, table_path)); |
| arrow::FieldVector fields = { |
| arrow::field("k1", arrow::int32(), /*nullable=*/false), |
| arrow::field("k0", arrow::int32(), /*nullable=*/false), |
| arrow::field("p0", arrow::int32(), /*nullable=*/false), |
| arrow::field("p1", arrow::int32(), /*nullable=*/false), |
| arrow::field("s1", arrow::utf8()), |
| arrow::field("s0", arrow::binary()), |
| arrow::field("v0", arrow::int32()), |
| arrow::field("v1", arrow::utf8()), |
| arrow::field("v2", arrow::int32()), |
| }; |
| std::map<std::string, std::string> options = {{Options::FILE_FORMAT, "orc"}, |
| {Options::MANIFEST_FORMAT, "orc"}, |
| {Options::TARGET_FILE_SIZE, "1024"}, |
| {Options::FILE_SYSTEM, "local"}}; |
| ASSERT_OK_AND_ASSIGN(auto helper, |
| TestHelper::Create(table_path, options, /*is_streaming_mode=*/true)); |
| |
| int64_t commit_identifier = 0; |
| auto data_type = arrow::struct_({fields}); |
| std::string data = R"([ |
| [0, 0, 0, 0, "apple", "see", 210, "new", 210] |
| ])"; |
| ASSERT_OK_AND_ASSIGN( |
| std::unique_ptr<RecordBatch> batch, |
| TestHelper::MakeRecordBatch(data_type, data, {{"p0", "0"}, {"p1", "0"}}, /*bucket=*/0, |
| /*row_kinds=*/{})); |
| |
| auto min_key = BinaryRowGenerator::GenerateRow({0, 0}, pool_.get()); |
| auto max_key = BinaryRowGenerator::GenerateRow({0, 0}, pool_.get()); |
| auto file_meta = std::make_shared<DataFileMeta>( |
| "data-xxx.xxx", /*file_size=*/543, |
| /*row_count=*/1, min_key, max_key, |
| BinaryRowGenerator::GenerateStats({0, 0}, {0, 0}, {0, 0}, pool_.get()), |
| BinaryRowGenerator::GenerateStats( |
| {0, 0, 0, 0, std::string("apple"), NullType(), 210, std::string("new"), 210}, |
| {0, 0, 0, 0, std::string("apple"), NullType(), 210, std::string("new"), 210}, |
| {0, 0, 0, 0, 0, 0, 0, 0, 0}, pool_.get()), |
| /*min_sequence_number=*/9, /*max_sequence_number=*/9, /*schema_id=*/4, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1724090888706ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, |
| /*first_row_id=*/std::nullopt, |
| /*write_cols=*/std::nullopt); |
| DataIncrement data_increment({file_meta}, {}, {}); |
| std::shared_ptr<CommitMessage> expected_commit_message = std::make_shared<CommitMessageImpl>( |
| BinaryRowGenerator::GenerateRow({0, 0}, pool_.get()), /*bucket=*/0, |
| /*total_bucket=*/1, data_increment, CompactIncrement({}, {}, {})); |
| std::vector<std::shared_ptr<CommitMessage>> expected_commit_messages = { |
| expected_commit_message}; |
| |
| ASSERT_OK_AND_ASSIGN( |
| auto commit_msgs, |
| helper->WriteAndCommit(std::move(batch), commit_identifier++, expected_commit_messages)); |
| |
| ASSERT_OK_AND_ASSIGN(std::optional<Snapshot> snapshot, helper->LatestSnapshot()); |
| ASSERT_TRUE(snapshot); |
| ASSERT_EQ(4, snapshot.value().SchemaId()); |
| |
| auto impl = std::dynamic_pointer_cast<CommitMessageImpl>(commit_msgs[0]); |
| // check sequence_number in commit msg |
| ASSERT_EQ(impl->data_increment_.new_files_[0]->min_sequence_number, 9); |
| ASSERT_EQ(impl->data_increment_.new_files_[0]->max_sequence_number, 9); |
| ASSERT_EQ(4, impl->data_increment_.new_files_[0]->schema_id); |
| // check sequence_number in data file |
| std::string file_name = |
| table_path + "/p0=0/p1=0/bucket-0/" + impl->data_increment_.new_files_[0]->file_name; |
| ASSERT_OK_AND_ASSIGN(std::shared_ptr<InputStream> input_stream, file_system_->Open(file_name)); |
| ASSERT_OK_AND_ASSIGN(auto file_format, FileFormatFactory::Get("orc", options)); |
| ASSERT_OK_AND_ASSIGN(auto reader_builder, file_format->CreateReaderBuilder(/*batch_size=*/10)); |
| ASSERT_OK_AND_ASSIGN(auto orc_batch_reader, reader_builder->Build(input_stream)); |
| arrow::FieldVector read_fields; |
| read_fields.push_back(arrow::field("_SEQUENCE_NUMBER", arrow::int64())); |
| read_fields.push_back(arrow::field("_VALUE_KIND", arrow::int8())); |
| auto read_schema = arrow::schema(read_fields); |
| std::unique_ptr<ArrowSchema> c_schema = std::make_unique<ArrowSchema>(); |
| auto arrow_status = arrow::ExportSchema(*read_schema, c_schema.get()); |
| ASSERT_TRUE(arrow_status.ok()); |
| ASSERT_OK(orc_batch_reader->SetReadSchema(c_schema.get(), /*predicate=*/nullptr, |
| /*selection_bitmap=*/std::nullopt)); |
| ASSERT_OK_AND_ASSIGN(auto result_array, |
| ReadResultCollector::CollectResult(orc_batch_reader.get())); |
| std::shared_ptr<arrow::ChunkedArray> expected_array; |
| auto array_status = |
| arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow::struct_(read_fields), {R"([ |
| [9, 0] |
| ])"}, |
| &expected_array); |
| ASSERT_TRUE(array_status.ok()); |
| ASSERT_TRUE(expected_array->Equals(*result_array)); |
| } |
| |
| TEST_P(WriteInteTest, TestWriteAndCommitIOException) { |
| auto string_field = arrow::field("f0", arrow::utf8()); |
| auto int_field = arrow::field("f1", arrow::int32()); |
| auto int_field1 = arrow::field("f2", arrow::int32()); |
| auto double_field = arrow::field("f3", arrow::float64()); |
| auto struct_type = arrow::struct_({string_field, int_field, int_field1, double_field}); |
| auto schema = |
| arrow::schema(arrow::FieldVector({string_field, int_field, int_field1, double_field})); |
| ::ArrowSchema arrow_schema; |
| ASSERT_TRUE(arrow::ExportSchema(*schema, &arrow_schema).ok()); |
| |
| auto file_format = GetParam(); |
| std::map<std::string, std::string> options = { |
| {Options::FILE_FORMAT, file_format}, |
| {Options::MANIFEST_FORMAT, "orc"}, |
| {Options::TARGET_FILE_SIZE, "1024"}, |
| {Options::FILE_SYSTEM, "local"}, |
| {Options::BUCKET, "2"}, |
| {Options::BUCKET_KEY, "f2"}, |
| }; |
| |
| auto dir = UniqueTestDirectory::Create(); |
| ASSERT_TRUE(dir); |
| |
| ASSERT_OK_AND_ASSIGN(std::string table_path, |
| CreateTestTable(dir->Str(), /*db_name=*/"foo", |
| /*table_name=*/"bar", &arrow_schema, |
| /*partition_keys=*/{"f1"}, |
| /*primary_keys=*/{}, options)); |
| |
| std::vector<std::shared_ptr<CommitMessage>> commit_messages; |
| bool write_run_complete = false; |
| auto io_hook = IOHook::GetInstance(); |
| for (size_t i = 0; i < 200; i++) { |
| ScopeGuard guard([&io_hook]() { io_hook->Clear(); }); |
| io_hook->Reset(i, IOHook::Mode::RETURN_ERROR); |
| WriteContextBuilder context_builder(table_path, "commit_user_1"); |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<WriteContext> write_context, |
| context_builder.SetOptions(options).WithStreamingMode(true).Finish()); |
| |
| Result<std::unique_ptr<FileStoreWrite>> write = |
| FileStoreWrite::Create(std::move(write_context)); |
| CHECK_HOOK_STATUS(write.status(), i); |
| auto& file_store_write = write.value(); |
| |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatch> batch_0, |
| MakeRecordBatch({{"Alice", 10, 1, 11.1}}, {{"f1", "10"}}, 0)); |
| CHECK_HOOK_STATUS(file_store_write->Write(std::move(batch_0)), i); |
| Result<std::vector<std::shared_ptr<CommitMessage>>> results = |
| file_store_write->PrepareCommit(/*wait_compaction=*/false, 0); |
| CHECK_HOOK_STATUS(results.status(), i); |
| commit_messages = results.value(); |
| write_run_complete = true; |
| break; |
| } |
| ASSERT_TRUE(write_run_complete); |
| |
| bool commit_run_complete = false; |
| for (size_t i = 0; i < 400; i += paimon::test::RandomNumber(5, 15)) { |
| ScopeGuard guard([&io_hook]() { io_hook->Clear(); }); |
| io_hook->Reset(i, IOHook::Mode::RETURN_ERROR); |
| CommitContextBuilder commit_context_builder(table_path, "commit_user_1"); |
| ASSERT_OK_AND_ASSIGN( |
| std::unique_ptr<CommitContext> commit_context, |
| commit_context_builder.AddOption(Options::MANIFEST_TARGET_FILE_SIZE, "8mb") |
| .AddOption(Options::FILE_SYSTEM, "local") |
| .IgnoreEmptyCommit(false) |
| .Finish()); |
| auto commit = FileStoreCommit::Create(std::move(commit_context)); |
| CHECK_HOOK_STATUS(commit.status(), i); |
| Result<int32_t> committed = commit.value()->FilterAndCommit({{0, commit_messages}}); |
| CHECK_HOOK_STATUS(committed.status(), i); |
| commit_run_complete = true; |
| break; |
| } |
| ASSERT_TRUE(commit_run_complete); |
| |
| std::vector<int64_t> actual_snapshots; |
| ASSERT_OK(FileUtils::ListVersionedFiles(file_system_, |
| PathUtil::JoinPath(table_path, "snapshot"), |
| SnapshotManager::SNAPSHOT_PREFIX, &actual_snapshots)); |
| std::vector<int64_t> expected_snapshots = {1}; |
| ASSERT_EQ(actual_snapshots, expected_snapshots); |
| } |
| |
| TEST_P(WriteInteTest, TestWriteWithFieldId) { |
| auto file_format = GetParam(); |
| if (file_format == "lance" || file_format == "avro") { |
| return; |
| } |
| // prepare write schema and write data |
| auto map_type = arrow::map(arrow::int8(), arrow::int16()); |
| auto list_type = arrow::list(DataField::ConvertDataFieldToArrowField( |
| DataField(536871936, arrow::field("item", arrow::float32())))); |
| std::vector<DataField> struct_fields = {DataField(3, arrow::field("f0", arrow::boolean())), |
| DataField(4, arrow::field("f1", arrow::int64()))}; |
| auto struct_type = DataField::ConvertDataFieldsToArrowStructType(struct_fields); |
| std::vector<DataField> data_fields = { |
| DataField(0, arrow::field("f1", map_type)), |
| DataField(1, arrow::field("f2", list_type)), |
| DataField(2, arrow::field("f3", struct_type)), |
| DataField(5, arrow::field("f4", arrow::timestamp(arrow::TimeUnit::NANO))), |
| DataField(6, arrow::field("f5", arrow::date32())), |
| DataField(7, arrow::field("f6", arrow::decimal128(2, 2)))}; |
| std::shared_ptr<arrow::DataType> arrow_data_type = |
| DataField::ConvertDataFieldsToArrowStructType(data_fields); |
| auto src_array = std::dynamic_pointer_cast<arrow::StructArray>( |
| arrow::ipc::internal::json::ArrayFromJSON(arrow_data_type, R"([ |
| [[[0, 0]], [0.1, 0.2], [true, 2], "1970-01-01 00:02:03.123123", 2456, "0.22"], |
| [[[127, 32767], [-128, -32768]], [1.1, 1.2], [false, 2222], "1970-01-01 00:02:03.123123", |
| 245, "0.12"], |
| [[[1, 64], [2, 32]], [2.2, 3.2], [true, 2], "1970-01-01 00:00:00.0", 24, null] |
| ])") |
| .ValueOrDie()); |
| |
| ::ArrowSchema c_schema; |
| ASSERT_TRUE(arrow::ExportType(*arrow_data_type, &c_schema).ok()); |
| std::map<std::string, std::string> options = { |
| {Options::MANIFEST_FORMAT, "orc"}, |
| {Options::FILE_FORMAT, file_format}, |
| {Options::TARGET_FILE_SIZE, "1024"}, |
| {Options::FILE_SYSTEM, "local"}, |
| }; |
| |
| auto dir = UniqueTestDirectory::Create(); |
| ASSERT_TRUE(dir); |
| ASSERT_OK_AND_ASSIGN(std::string table_path, CreateTestTable(dir->Str(), /*db_name=*/"foo", |
| /*table_name=*/"bar", &c_schema, |
| /*partition_keys=*/{}, |
| /*primary_keys=*/{}, options)); |
| |
| WriteContextBuilder context_builder(table_path, "commit_user_1"); |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<WriteContext> write_context, |
| context_builder.SetOptions(options).Finish()); |
| |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<FileStoreWrite> file_store_write, |
| FileStoreWrite::Create(std::move(write_context))); |
| ::ArrowArray c_array; |
| ASSERT_TRUE(arrow::ExportArray(*src_array, &c_array).ok()); |
| RecordBatchBuilder batch_builder(&c_array); |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatch> batch, batch_builder.Finish()); |
| ASSERT_OK(file_store_write->Write(std::move(batch))); |
| |
| ASSERT_OK_AND_ASSIGN(std::vector<std::shared_ptr<CommitMessage>> commit_messages, |
| file_store_write->PrepareCommit()); |
| ASSERT_OK(file_store_write->Close()); |
| |
| // prepare CommitContext |
| CommitContextBuilder commit_context_builder(table_path, "commit_user_1"); |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<CommitContext> commit_context, |
| commit_context_builder.AddOption(Options::MANIFEST_TARGET_FILE_SIZE, "8mb") |
| .AddOption(Options::FILE_SYSTEM, "local") |
| .IgnoreEmptyCommit(false) |
| .Finish()); |
| // commit |
| ASSERT_OK_AND_ASSIGN(auto commit, FileStoreCommit::Create(std::move(commit_context))); |
| ASSERT_OK(commit->Commit(commit_messages)); |
| |
| // check data file has field id meta |
| std::vector<std::unique_ptr<BasicFileStatus>> status_list; |
| ASSERT_OK(file_system_->ListDir(table_path + "/bucket-0/", &status_list)); |
| std::vector<std::string> files; |
| for (const auto& file_status : status_list) { |
| if (!file_status->IsDir()) { |
| files.emplace_back(file_status->GetPath()); |
| } |
| } |
| ASSERT_EQ(files.size(), 1); |
| std::string file_name = files[0]; |
| ASSERT_OK_AND_ASSIGN(std::shared_ptr<InputStream> input_stream, file_system_->Open(file_name)); |
| ASSERT_OK_AND_ASSIGN(auto file_format_impl, |
| FileFormatFactory::Get(file_format, /*options=*/{})); |
| ASSERT_OK_AND_ASSIGN(auto reader_builder, |
| file_format_impl->CreateReaderBuilder(/*batch_size=*/1024)); |
| ASSERT_OK_AND_ASSIGN(auto file_batch_reader, |
| reader_builder->WithMemoryPool(pool_)->Build(input_stream)); |
| ASSERT_OK_AND_ASSIGN(auto c_file_schema, file_batch_reader->GetFileSchema()); |
| auto arrow_file_schema = arrow::ImportSchema(c_file_schema.get()).ValueOrDie(); |
| ASSERT_OK_AND_ASSIGN(auto write_data_fields, |
| DataField::ConvertArrowSchemaToDataFields(arrow_file_schema)); |
| ASSERT_EQ(write_data_fields, data_fields); |
| } |
| |
| TEST_P(WriteInteTest, TestAppendTableWriteAndReadWithExternalPath) { |
| arrow::FieldVector fields = { |
| arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::int32()), |
| arrow::field("f2", arrow::int32()), arrow::field("f3", arrow::float64())}; |
| |
| arrow::Schema typed_schema(fields); |
| ::ArrowSchema schema; |
| ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema).ok()); |
| auto dir = UniqueTestDirectory::Create(); |
| ASSERT_TRUE(dir); |
| auto external_dir = UniqueTestDirectory::Create(); |
| ASSERT_TRUE(external_dir); |
| std::string external_test_dir = "FILE://" + external_dir->Str(); |
| auto file_format = GetParam(); |
| std::map<std::string, std::string> options = { |
| {Options::FILE_FORMAT, file_format}, |
| {Options::TARGET_FILE_SIZE, "1024"}, |
| {Options::FILE_SYSTEM, "local"}, |
| {Options::DATA_FILE_EXTERNAL_PATHS, external_test_dir}, |
| {Options::DATA_FILE_EXTERNAL_PATHS_STRATEGY, "round-robin"}}; |
| // create table |
| ASSERT_OK_AND_ASSIGN(std::string table_path, |
| CreateTestTable(dir->Str(), /*db_name=*/"append_table_with_external_path", |
| /*table_name=*/"append_table_with_external_path", &schema, |
| /*partition_keys=*/{"f1"}, /*primary_keys=*/{}, options)); |
| // write |
| WriteContextBuilder context_builder(table_path, "commit_user_1"); |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<WriteContext> write_context, |
| context_builder.SetOptions(options).Finish()); |
| std::string root_path = write_context->GetRootPath(); |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<FileStoreWrite> file_store_write, |
| FileStoreWrite::Create(std::move(write_context))); |
| |
| std::shared_ptr<arrow::Array> array = |
| arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({fields}), R"([ |
| ["Alice", 10, 0, 11.1], |
| ["Bob", 10, 1, 12.1], |
| ["Cathy", 10, 0, 13.1], |
| ["Emily", 10, 0, 14.1] |
| ])") |
| .ValueOrDie(); |
| ::ArrowArray arrow_array; |
| ASSERT_TRUE(arrow::ExportArray(*array, &arrow_array).ok()); |
| |
| RecordBatchBuilder batch_builder(&arrow_array); |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatch> batch, |
| batch_builder.SetPartition({{"f1", "10"}}).SetBucket(0).Finish()); |
| ASSERT_OK(file_store_write->Write(std::move(batch))); |
| ArrowArrayRelease(&arrow_array); |
| |
| ASSERT_OK_AND_ASSIGN(std::vector<std::shared_ptr<CommitMessage>> results, |
| file_store_write->PrepareCommit()); |
| ASSERT_EQ(results.size(), 1); |
| auto commit_msg_impl = std::dynamic_pointer_cast<CommitMessageImpl>(results[0]); |
| auto meta = commit_msg_impl->data_increment_.new_files_[0]; |
| CommitContextBuilder commit_context_builder(root_path, "commit_user_1"); |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<CommitContext> commit_context, |
| commit_context_builder.AddOption(Options::MANIFEST_TARGET_FILE_SIZE, "8mb") |
| .AddOption(Options::FILE_SYSTEM, "local") |
| .IgnoreEmptyCommit(false) |
| .Finish()); |
| ASSERT_OK_AND_ASSIGN(auto commit, FileStoreCommit::Create(std::move(commit_context))); |
| ASSERT_OK(commit->Commit(results, 1)); |
| |
| // check external path |
| ASSERT_OK_AND_ASSIGN(bool file_exist, file_system_->Exists(meta->external_path.value())); |
| ASSERT_TRUE(file_exist); |
| |
| // check read result |
| ScanContextBuilder scan_context_builder(table_path); |
| scan_context_builder.AddOption(Options::FILE_FORMAT, file_format); |
| ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); |
| ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); |
| |
| ReadContextBuilder read_context_builder(table_path); |
| read_context_builder.AddOption(Options::FILE_FORMAT, file_format); |
| ASSERT_OK_AND_ASSIGN(auto read_context, read_context_builder.Finish()); |
| ASSERT_OK_AND_ASSIGN(auto table_read, TableRead::Create(std::move(read_context))); |
| |
| ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); |
| ASSERT_OK_AND_ASSIGN(auto batch_reader, table_read->CreateReader(result_plan->Splits())); |
| ASSERT_OK_AND_ASSIGN(auto read_result, ReadResultCollector::CollectResult(batch_reader.get())); |
| |
| arrow::FieldVector fields_with_row_kind = fields; |
| fields_with_row_kind.insert(fields_with_row_kind.begin(), |
| arrow::field("_VALUE_KIND", arrow::int8())); |
| auto data_type = arrow::struct_(fields_with_row_kind); |
| |
| std::shared_ptr<arrow::Array> expected_array = |
| arrow::ipc::internal::json::ArrayFromJSON(data_type, R"([ |
| [0, "Alice", 10, 0, 11.1], |
| [0, "Bob", 10, 1, 12.1], |
| [0, "Cathy", 10, 0, 13.1], |
| [0, "Emily", 10, 0, 14.1] |
| ])") |
| .ValueOrDie(); |
| auto expected = std::make_shared<arrow::ChunkedArray>(expected_array); |
| ASSERT_TRUE(expected->Equals(read_result)) << read_result->ToString(); |
| } |
| |
| TEST_P(WriteInteTest, TestPKTableWriteAndReadWithExternalPath) { |
| arrow::FieldVector fields = { |
| arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::int32()), |
| arrow::field("f2", arrow::int32()), arrow::field("f3", arrow::float64())}; |
| |
| arrow::Schema typed_schema(fields); |
| ::ArrowSchema schema; |
| ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema).ok()); |
| auto dir = UniqueTestDirectory::Create(); |
| ASSERT_TRUE(dir); |
| auto external_dir = UniqueTestDirectory::Create(); |
| ASSERT_TRUE(external_dir); |
| std::string external_test_dir = "FILE://" + external_dir->Str(); |
| auto file_format = GetParam(); |
| std::map<std::string, std::string> options = { |
| {Options::BUCKET, "1"}, |
| {Options::FILE_FORMAT, file_format}, |
| {Options::TARGET_FILE_SIZE, "1024"}, |
| {Options::FILE_SYSTEM, "local"}, |
| {Options::DATA_FILE_EXTERNAL_PATHS, external_test_dir}, |
| {Options::DATA_FILE_EXTERNAL_PATHS_STRATEGY, "round-robin"}}; |
| // create table |
| ASSERT_OK_AND_ASSIGN( |
| std::string table_path, |
| CreateTestTable(dir->Str(), /*db_name=*/"pk_table_with_external_path", |
| /*table_name=*/"pk_table_with_external_path", &schema, |
| /*partition_keys=*/{"f1"}, /*primary_keys=*/{"f0", "f1"}, options)); |
| // write |
| WriteContextBuilder context_builder(table_path, "commit_user_1"); |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<WriteContext> write_context, |
| context_builder.SetOptions(options).Finish()); |
| std::string root_path = write_context->GetRootPath(); |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<FileStoreWrite> file_store_write, |
| FileStoreWrite::Create(std::move(write_context))); |
| |
| std::shared_ptr<arrow::Array> array = |
| arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({fields}), R"([ |
| ["Alice", 10, 0, 11.1], |
| ["Bob", 10, 1, 12.1], |
| ["Cathy", 10, 0, 13.1], |
| ["Emily", 10, 0, 14.1] |
| ])") |
| .ValueOrDie(); |
| ::ArrowArray arrow_array; |
| ASSERT_TRUE(arrow::ExportArray(*array, &arrow_array).ok()); |
| |
| RecordBatchBuilder batch_builder(&arrow_array); |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatch> batch, |
| batch_builder.SetPartition({{"f1", "10"}}).SetBucket(0).Finish()); |
| ASSERT_OK(file_store_write->Write(std::move(batch))); |
| ArrowArrayRelease(&arrow_array); |
| |
| ASSERT_OK_AND_ASSIGN(std::vector<std::shared_ptr<CommitMessage>> results, |
| file_store_write->PrepareCommit()); |
| ASSERT_EQ(results.size(), 1); |
| auto commit_msg_impl = std::dynamic_pointer_cast<CommitMessageImpl>(results[0]); |
| auto meta = commit_msg_impl->data_increment_.new_files_[0]; |
| |
| // check external path |
| std::string external_path = meta->external_path.value(); |
| ASSERT_OK_AND_ASSIGN(bool file_exist, file_system_->Exists(external_path)); |
| ASSERT_TRUE(file_exist); |
| ASSERT_TRUE(external_path.find("tmp") != std::string::npos); |
| } |
| |
| TEST_P(WriteInteTest, TestAppendTableStreamWriteWithExternalPath) { |
| auto dir = UniqueTestDirectory::Create(); |
| auto external_dir = UniqueTestDirectory::Create(); |
| ASSERT_TRUE(external_dir); |
| std::string external_test_dir = "FILE://" + external_dir->Str(); |
| arrow::FieldVector fields = { |
| arrow::field("f0", arrow::boolean()), arrow::field("f1", arrow::int8()), |
| arrow::field("f2", arrow::int16()), arrow::field("f3", arrow::int32()), |
| arrow::field("field_null", arrow::int32()), arrow::field("f4", arrow::int64()), |
| arrow::field("f5", arrow::float32()), arrow::field("f6", arrow::float64()), |
| arrow::field("f7", arrow::utf8()), arrow::field("f8", arrow::binary())}; |
| auto schema = arrow::schema(fields); |
| |
| auto file_format = GetParam(); |
| std::map<std::string, std::string> options = { |
| {Options::MANIFEST_FORMAT, "orc"}, |
| {Options::FILE_FORMAT, file_format}, |
| {Options::TARGET_FILE_SIZE, "1024"}, |
| {Options::BUCKET, "1"}, |
| {Options::BUCKET_KEY, "f5"}, |
| {Options::FILE_SYSTEM, "local"}, |
| {Options::DATA_FILE_EXTERNAL_PATHS, external_test_dir}, |
| {Options::DATA_FILE_EXTERNAL_PATHS_STRATEGY, "round-robin"}}; |
| |
| ASSERT_OK_AND_ASSIGN( |
| auto helper, TestHelper::Create(dir->Str(), schema, /*partition_keys=*/{}, |
| /*primary_keys=*/{}, options, /*is_streaming_mode=*/true)); |
| int64_t commit_identifier = 0; |
| |
| std::string data_1 = |
| R"([[true, 0, 32767, 2147483647, null, 4294967295, 0.5, 1.141592659, "20250327", "banana"], |
| [false, 1, 32767, null, null, 4294967296, 1.0, 2.141592658, "20250327", "dog"], |
| [null, 1, 32767, 2147483647, null, null, 2.0, 3.141592657, null, "lucy"], |
| [true, -2, -32768, -2147483648, null, -4294967298, 2.0, 3.141592657, "20250326", "mouse"]])"; |
| |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatch> batch_1, |
| TestHelper::MakeRecordBatch(arrow::struct_(fields), data_1, |
| /*partition_map=*/{}, /*bucket=*/0, {})); |
| |
| auto file_meta = std::make_shared<DataFileMeta>( |
| "data-xxx.xxx", /*file_size=*/543, /*row_count=*/4, |
| /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), |
| /*key_stats=*/SimpleStats::EmptyStats(), |
| BinaryRowGenerator::GenerateStats( |
| {false, static_cast<int8_t>(-2), static_cast<int16_t>(-32768), |
| static_cast<int32_t>(-2147483648), NullType(), static_cast<int64_t>(-4294967298), |
| static_cast<float>(0.5), 1.141592659, std::string("20250326"), NullType()}, |
| {true, static_cast<int8_t>(1), static_cast<int16_t>(32767), |
| static_cast<int32_t>(2147483647), NullType(), static_cast<int64_t>(4294967296), |
| static_cast<float>(2.0), 3.141592657, std::string("20250327"), NullType()}, |
| std::vector<int64_t>({1, 0, 0, 1, 4, 1, 0, 0, 1, 0}), pool_.get()), |
| /*min_sequence_number=*/0, /*max_sequence_number=*/3, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1724090888706ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, "FILE:/tmp/xxx", std::nullopt, std::nullopt); |
| file_meta = ReconstructDataFileMeta(file_meta); |
| DataIncrement data_increment({file_meta}, {}, {}); |
| std::shared_ptr<CommitMessage> expected_commit_message_1 = std::make_shared<CommitMessageImpl>( |
| BinaryRow::EmptyRow(), /*bucket=*/0, |
| /*total_bucket=*/1, data_increment, CompactIncrement({}, {}, {})); |
| std::vector<std::shared_ptr<CommitMessage>> expected_commit_messages_1 = { |
| expected_commit_message_1}; |
| ASSERT_OK_AND_ASSIGN(auto commit_msgs, |
| helper->WriteAndCommit(std::move(batch_1), commit_identifier++, |
| expected_commit_messages_1)); |
| ASSERT_OK_AND_ASSIGN(std::optional<Snapshot> snapshot1, helper->LatestSnapshot()); |
| ASSERT_TRUE(snapshot1); |
| ASSERT_EQ(1, snapshot1.value().Id()); |
| ASSERT_EQ(4, snapshot1.value().TotalRecordCount().value()); |
| ASSERT_EQ(4, snapshot1.value().DeltaRecordCount().value()); |
| |
| arrow::FieldVector fields_with_row_kind = fields; |
| fields_with_row_kind.insert(fields_with_row_kind.begin(), |
| arrow::field("_VALUE_KIND", arrow::int8())); |
| auto data_type = arrow::struct_(fields_with_row_kind); |
| |
| ASSERT_OK_AND_ASSIGN(std::vector<std::shared_ptr<Split>> data_splits_1, |
| helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt)); |
| ASSERT_EQ(data_splits_1.size(), 1); |
| std::string expected_data_1 = |
| R"([[0, true, 0, 32767, 2147483647, null, 4294967295, 0.5, 1.141592659, "20250327", "banana"], |
| [0, false, 1, 32767, null, null, 4294967296, 1.0, 2.141592658, "20250327", "dog"], |
| [0, null, 1, 32767, 2147483647, null, null, 2.0, 3.141592657, null, "lucy"], |
| [0, true, -2, -32768, -2147483648, null, -4294967298, 2.0, 3.141592657, "20250326", "mouse"]])"; |
| |
| ASSERT_OK_AND_ASSIGN(bool success, |
| helper->ReadAndCheckResult(data_type, data_splits_1, expected_data_1)); |
| ASSERT_TRUE(success); |
| |
| std::string data_2 = |
| fmt::format(R"([ |
| [true, {},{},{},null,{},{},{},"∞",""], |
| [false,{},{},{},null,{},{},{}, "",""], |
| [true, 42,-1,999999999,null,123456789012345,0.1,-Inf,"a\u20ACb","binary\ndata"]])", |
| std::numeric_limits<int8_t>::min(), std::numeric_limits<int16_t>::min(), |
| std::numeric_limits<int32_t>::min(), std::numeric_limits<int64_t>::min(), |
| std::numeric_limits<float>::min(), std::numeric_limits<double>::min(), |
| std::numeric_limits<int8_t>::max(), std::numeric_limits<int16_t>::max(), |
| std::numeric_limits<int32_t>::max(), std::numeric_limits<int64_t>::max(), |
| std::numeric_limits<float>::max(), std::numeric_limits<double>::max()); |
| |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatch> batch_2, |
| TestHelper::MakeRecordBatch(arrow::struct_(fields), data_2, |
| /*partition_map=*/{}, /*bucket=*/0, {})); |
| |
| auto file_meta_2 = std::make_shared<DataFileMeta>( |
| "data-xxx.xxx", /*file_size=*/543, /*row_count=*/3, |
| /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), |
| /*key_stats=*/SimpleStats::EmptyStats(), |
| BinaryRowGenerator::GenerateStats( |
| {false, std::numeric_limits<int8_t>::min(), std::numeric_limits<int16_t>::min(), |
| std::numeric_limits<int32_t>::min(), NullType(), std::numeric_limits<int64_t>::min(), |
| std::numeric_limits<float>::min(), -std::numeric_limits<double>::infinity(), |
| std::string(""), NullType()}, |
| {true, std::numeric_limits<int8_t>::max(), std::numeric_limits<int16_t>::max(), |
| std::numeric_limits<int32_t>::max(), NullType(), std::numeric_limits<int64_t>::max(), |
| std::numeric_limits<float>::max(), std::numeric_limits<double>::max(), |
| std::string("∞"), NullType()}, |
| std::vector<int64_t>({0, 0, 0, 0, 3, 0, 0, 0, 0, 0}), pool_.get()), |
| /*min_sequence_number=*/4, /*max_sequence_number=*/6, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1724090888706ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, "FILE:/tmp/xxx", std::nullopt, std::nullopt); |
| file_meta_2 = ReconstructDataFileMeta(file_meta_2); |
| DataIncrement data_increment_2({file_meta_2}, {}, {}); |
| std::shared_ptr<CommitMessage> expected_commit_message_2 = std::make_shared<CommitMessageImpl>( |
| BinaryRow::EmptyRow(), /*bucket=*/0, |
| /*total_bucket=*/1, data_increment_2, CompactIncrement({}, {}, {})); |
| std::vector<std::shared_ptr<CommitMessage>> expected_commit_messages_2 = { |
| expected_commit_message_2}; |
| ASSERT_OK_AND_ASSIGN(auto commit_msgs_2, |
| helper->WriteAndCommit(std::move(batch_2), commit_identifier++, |
| expected_commit_messages_2)); |
| ASSERT_OK_AND_ASSIGN(std::optional<Snapshot> snapshot2, helper->LatestSnapshot()); |
| ASSERT_TRUE(snapshot2); |
| ASSERT_EQ(2, snapshot2.value().Id()); |
| ASSERT_EQ(7, snapshot2.value().TotalRecordCount().value()); |
| ASSERT_EQ(3, snapshot2.value().DeltaRecordCount().value()); |
| ASSERT_OK_AND_ASSIGN(std::vector<std::shared_ptr<Split>> data_splits_2, helper->Scan()); |
| ASSERT_EQ(data_splits_2.size(), 1); |
| std::string expected_data_2 = |
| fmt::format(R"([ |
| [0, true, {},{},{},null,{},{},{},"∞",""], |
| [0, false,{},{},{},null,{},{},{}, "",""], |
| [0, true, 42,-1,999999999,null,123456789012345,0.1,-Inf,"a\u20ACb","binary\ndata"]])", |
| std::numeric_limits<int8_t>::min(), std::numeric_limits<int16_t>::min(), |
| std::numeric_limits<int32_t>::min(), std::numeric_limits<int64_t>::min(), |
| std::numeric_limits<float>::min(), std::numeric_limits<double>::min(), |
| std::numeric_limits<int8_t>::max(), std::numeric_limits<int16_t>::max(), |
| std::numeric_limits<int32_t>::max(), std::numeric_limits<int64_t>::max(), |
| std::numeric_limits<float>::max(), std::numeric_limits<double>::max()); |
| |
| ASSERT_OK_AND_ASSIGN(success, |
| helper->ReadAndCheckResult(data_type, data_splits_2, expected_data_2)); |
| ASSERT_TRUE(success); |
| } |
| |
| TEST_P(WriteInteTest, TestWriteAndReadWithSpecialPartitionValue) { |
| arrow::FieldVector fields = { |
| arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::int32()), |
| arrow::field("f2", arrow::int32()), arrow::field("f3", arrow::float64()), |
| arrow::field("dt", arrow::utf8()), arrow::field("hr", arrow::utf8())}; |
| |
| arrow::Schema typed_schema(fields); |
| ::ArrowSchema schema; |
| ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema).ok()); |
| auto dir = UniqueTestDirectory::Create(); |
| ASSERT_TRUE(dir); |
| auto file_format = GetParam(); |
| std::map<std::string, std::string> options = {{Options::FILE_FORMAT, file_format}, |
| {Options::TARGET_FILE_SIZE, "1024"}, |
| {Options::FILE_SYSTEM, "local"}}; |
| // create table |
| ASSERT_OK_AND_ASSIGN(std::string table_path, |
| CreateTestTable(dir->Str(), /*db_name=*/"foo", |
| /*table_name=*/"bar", &schema, |
| /*partition_keys=*/{"dt", "f1", "hr"}, |
| /*primary_keys=*/{}, options)); |
| |
| // write |
| WriteContextBuilder context_builder(table_path, "commit_user_1"); |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<WriteContext> write_context, |
| context_builder.SetOptions(options).Finish()); |
| std::string root_path = write_context->GetRootPath(); |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<FileStoreWrite> file_store_write, |
| FileStoreWrite::Create(std::move(write_context))); |
| auto write = [&](const std::string& json_array, |
| const std::map<std::string, std::string>& partition) -> void { |
| std::shared_ptr<arrow::Array> array = |
| arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({fields}), json_array) |
| .ValueOrDie(); |
| ::ArrowArray arrow_array; |
| ASSERT_TRUE(arrow::ExportArray(*array, &arrow_array).ok()); |
| |
| RecordBatchBuilder batch_builder(&arrow_array); |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatch> batch, |
| batch_builder.SetPartition(partition).SetBucket(0).Finish()); |
| ASSERT_OK(file_store_write->Write(std::move(batch))); |
| ArrowArrayRelease(&arrow_array); |
| }; |
| |
| write(R"([["Alice", 10, 0, 11.1, " ", "a=b?"]])", {{"f1", "10"}, {"dt", " "}, {"hr", "a=b?"}}); |
| write(R"([["Bob", 10, 0, 12.1, "", "a=b?"]])", {{"f1", "10"}, {"dt", ""}, {"hr", "a=b?"}}); |
| write(R"([["Cathy", 10, 0, 13.1, "__DEFAULT_PARTITION__", "a=b?"]])", |
| {{"f1", "10"}, {"dt", "__DEFAULT_PARTITION__"}, {"hr", "a=b?"}}); |
| write(R"([["David", 10, 0, 13.1, null, "a=b?"]])", |
| {{"f1", "10"}, {"dt", "__DEFAULT_PARTITION__"}, {"hr", "a=b?"}}); |
| |
| ASSERT_OK_AND_ASSIGN(std::vector<std::shared_ptr<CommitMessage>> results, |
| file_store_write->PrepareCommit()); |
| ASSERT_EQ(results.size(), 3); |
| auto commit_msg_impl = std::dynamic_pointer_cast<CommitMessageImpl>(results[0]); |
| auto meta = commit_msg_impl->data_increment_.new_files_[0]; |
| CommitContextBuilder commit_context_builder(root_path, "commit_user_1"); |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<CommitContext> commit_context, |
| commit_context_builder.AddOption(Options::MANIFEST_TARGET_FILE_SIZE, "8mb") |
| .AddOption(Options::FILE_SYSTEM, "local") |
| .IgnoreEmptyCommit(false) |
| .Finish()); |
| ASSERT_OK_AND_ASSIGN(auto commit, FileStoreCommit::Create(std::move(commit_context))); |
| ASSERT_OK(commit->Commit(results, 1)); |
| |
| arrow::FieldVector fields_with_row_kind = fields; |
| fields_with_row_kind.insert(fields_with_row_kind.begin(), |
| arrow::field("_VALUE_KIND", arrow::int8())); |
| auto data_type = arrow::struct_(fields_with_row_kind); |
| { |
| ScanContextBuilder scan_context_builder(table_path); |
| scan_context_builder.AddOption(Options::FILE_FORMAT, file_format); |
| ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); |
| ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); |
| |
| ReadContextBuilder read_context_builder(table_path); |
| read_context_builder.AddOption(Options::FILE_FORMAT, file_format); |
| ASSERT_OK_AND_ASSIGN(auto read_context, read_context_builder.Finish()); |
| ASSERT_OK_AND_ASSIGN(auto table_read, TableRead::Create(std::move(read_context))); |
| |
| ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); |
| ASSERT_OK_AND_ASSIGN(auto batch_reader, table_read->CreateReader(result_plan->Splits())); |
| ASSERT_OK_AND_ASSIGN(auto read_result, |
| ReadResultCollector::CollectResult(batch_reader.get())); |
| // NOTE: |
| // Users should not use the system-reserved keyword "__DEFAULT_PARTITION__" as a partition |
| // value. If used, it may lead to behavioral inconsistencies between C++ Paimon and Java |
| // Paimon. |
| // |
| // In Java Paimon, "__DEFAULT_PARTITION__" is stored as-is in the `Partition` field of |
| // `DataFileMeta`. In contrast, C++ Paimon treats "__DEFAULT_PARTITION__" as a placeholder |
| // for null and stores a null value in `DataFileMeta`'s partition field. |
| // |
| // Additionally, if users specify "__DEFAULT_PARTITION__" as a PartitionFilter during a |
| // scan, Java Paimon will NOT be able to retrieve the corresponding record, whereas C++ |
| // Paimon WILL retrieve it and treat the partition value as null. |
| // |
| // To ensure consistent behavior across platforms, avoid using "__DEFAULT_PARTITION__" in |
| // data. |
| std::shared_ptr<arrow::Array> array = |
| arrow::ipc::internal::json::ArrayFromJSON(data_type, R"([ |
| [0, "Cathy", 10, 0, 13.1, null, "a=b?"], |
| [0, "David", 10, 0, 13.1, null, "a=b?"], |
| [0, "Bob", 10, 0, 12.1, "", "a=b?"], |
| [0, "Alice", 10, 0, 11.1, " ", "a=b?"] |
| ])") |
| .ValueOrDie(); |
| auto expected = std::make_shared<arrow::ChunkedArray>(array); |
| ASSERT_TRUE(expected); |
| auto schema = std::make_shared<arrow::Schema>(fields_with_row_kind); |
| ASSERT_OK_AND_ASSIGN(expected, ReadResultCollector::SortArray(expected, schema)); |
| if (read_result && read_result->length() > 0) { |
| ASSERT_OK_AND_ASSIGN(read_result, ReadResultCollector::SortArray(read_result, schema)); |
| } |
| ASSERT_TRUE(expected->Equals(read_result)) << read_result->ToString(); |
| } |
| { |
| ScanContextBuilder scan_context_builder(table_path); |
| scan_context_builder.AddOption(Options::FILE_FORMAT, file_format); |
| std::vector<std::map<std::string, std::string>> partition_filters = { |
| {{"dt", " "}, {"hr", "a=b?"}}}; |
| ASSERT_OK_AND_ASSIGN(auto scan_context, |
| scan_context_builder.SetPartitionFilter(partition_filters).Finish()); |
| ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); |
| |
| ReadContextBuilder read_context_builder(table_path); |
| read_context_builder.AddOption(Options::FILE_FORMAT, file_format); |
| ASSERT_OK_AND_ASSIGN(auto read_context, read_context_builder.Finish()); |
| ASSERT_OK_AND_ASSIGN(auto table_read, TableRead::Create(std::move(read_context))); |
| |
| ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); |
| ASSERT_OK_AND_ASSIGN(auto batch_reader, table_read->CreateReader(result_plan->Splits())); |
| ASSERT_OK_AND_ASSIGN(auto read_result, |
| ReadResultCollector::CollectResult(batch_reader.get())); |
| std::shared_ptr<arrow::Array> array = |
| arrow::ipc::internal::json::ArrayFromJSON(data_type, R"([ |
| [0, "Alice", 10, 0, 11.1, " ", "a=b?"] |
| ])") |
| .ValueOrDie(); |
| auto expected = std::make_shared<arrow::ChunkedArray>(array); |
| ASSERT_TRUE(expected); |
| ASSERT_TRUE(expected->Equals(read_result)) << read_result->ToString(); |
| } |
| { |
| ScanContextBuilder scan_context_builder(table_path); |
| scan_context_builder.AddOption(Options::FILE_FORMAT, file_format); |
| std::vector<std::map<std::string, std::string>> partition_filters = { |
| {{"dt", ""}, {"hr", "a=b?"}}}; |
| ASSERT_OK_AND_ASSIGN(auto scan_context, |
| scan_context_builder.SetPartitionFilter(partition_filters).Finish()); |
| ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); |
| |
| ReadContextBuilder read_context_builder(table_path); |
| read_context_builder.AddOption(Options::FILE_FORMAT, file_format); |
| ASSERT_OK_AND_ASSIGN(auto read_context, read_context_builder.Finish()); |
| ASSERT_OK_AND_ASSIGN(auto table_read, TableRead::Create(std::move(read_context))); |
| |
| ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); |
| ASSERT_OK_AND_ASSIGN(auto batch_reader, table_read->CreateReader(result_plan->Splits())); |
| ASSERT_OK_AND_ASSIGN(auto read_result, |
| ReadResultCollector::CollectResult(batch_reader.get())); |
| std::shared_ptr<arrow::Array> array = |
| arrow::ipc::internal::json::ArrayFromJSON(data_type, R"([ |
| [0, "Bob", 10, 0, 12.1, "", "a=b?"] |
| ])") |
| .ValueOrDie(); |
| auto expected = std::make_shared<arrow::ChunkedArray>(array); |
| ASSERT_TRUE(expected); |
| ASSERT_TRUE(expected->Equals(read_result)) << read_result->ToString(); |
| } |
| { |
| ScanContextBuilder scan_context_builder(table_path); |
| scan_context_builder.AddOption(Options::FILE_FORMAT, file_format); |
| std::vector<std::map<std::string, std::string>> partition_filters = { |
| {{"dt", "__DEFAULT_PARTITION__"}, {"hr", "a=b?"}}}; |
| ASSERT_OK_AND_ASSIGN(auto scan_context, |
| scan_context_builder.SetPartitionFilter(partition_filters).Finish()); |
| ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); |
| |
| ReadContextBuilder read_context_builder(table_path); |
| read_context_builder.AddOption(Options::FILE_FORMAT, file_format); |
| ASSERT_OK_AND_ASSIGN(auto read_context, read_context_builder.Finish()); |
| ASSERT_OK_AND_ASSIGN(auto table_read, TableRead::Create(std::move(read_context))); |
| |
| ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); |
| ASSERT_OK_AND_ASSIGN(auto batch_reader, table_read->CreateReader(result_plan->Splits())); |
| ASSERT_OK_AND_ASSIGN(auto read_result, |
| ReadResultCollector::CollectResult(batch_reader.get())); |
| std::shared_ptr<arrow::Array> array = |
| arrow::ipc::internal::json::ArrayFromJSON(data_type, R"([ |
| [0, "Cathy", 10, 0, 13.1, null, "a=b?"], |
| [0, "David", 10, 0, 13.1, null, "a=b?"] |
| ])") |
| .ValueOrDie(); |
| auto expected = std::make_shared<arrow::ChunkedArray>(array); |
| ASSERT_TRUE(expected); |
| ASSERT_TRUE(expected->Equals(read_result)) << read_result->ToString(); |
| } |
| } |
| |
| TEST_P(WriteInteTest, TestWriteWithNestedSchema) { |
| arrow::FieldVector fields = { |
| arrow::field("f0", arrow::struct_({arrow::field("v0", arrow::boolean()), |
| arrow::field("v1", arrow::int64())}))}; |
| arrow::Schema typed_schema(fields); |
| ::ArrowSchema schema; |
| ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema).ok()); |
| auto dir = UniqueTestDirectory::Create(); |
| ASSERT_TRUE(dir); |
| auto file_format = GetParam(); |
| std::map<std::string, std::string> options = {{Options::FILE_FORMAT, file_format}, |
| {Options::FILE_SYSTEM, "local"}}; |
| // create table |
| ASSERT_OK_AND_ASSIGN(std::string table_path, |
| CreateTestTable(dir->Str(), /*db_name=*/"foo", |
| /*table_name=*/"bar", &schema, |
| /*partition_keys=*/{}, /*primary_keys=*/{}, options)); |
| // write |
| WriteContextBuilder context_builder(table_path, "commit_user_1"); |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<WriteContext> write_context, |
| context_builder.SetOptions(options).Finish()); |
| std::string root_path = write_context->GetRootPath(); |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<FileStoreWrite> file_store_write, |
| FileStoreWrite::Create(std::move(write_context))); |
| |
| std::shared_ptr<arrow::Array> array = |
| arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({fields}), R"([ |
| [[true, 2]], |
| [null], |
| [[false, 22]], |
| [null] |
| ])") |
| .ValueOrDie(); |
| ::ArrowArray arrow_array; |
| ASSERT_TRUE(arrow::ExportArray(*array, &arrow_array).ok()); |
| |
| RecordBatchBuilder batch_builder(&arrow_array); |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatch> batch, |
| batch_builder.SetPartition({}).SetBucket(0).Finish()); |
| ASSERT_OK(file_store_write->Write(std::move(batch))); |
| ArrowArrayRelease(&arrow_array); |
| |
| ASSERT_OK_AND_ASSIGN(std::vector<std::shared_ptr<CommitMessage>> results, |
| file_store_write->PrepareCommit()); |
| ASSERT_EQ(results.size(), 1); |
| CommitContextBuilder commit_context_builder(root_path, "commit_user_1"); |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<CommitContext> commit_context, |
| commit_context_builder.AddOption(Options::MANIFEST_TARGET_FILE_SIZE, "8mb") |
| .AddOption(Options::FILE_SYSTEM, "local") |
| .IgnoreEmptyCommit(false) |
| .Finish()); |
| ASSERT_OK_AND_ASSIGN(auto commit, FileStoreCommit::Create(std::move(commit_context))); |
| ASSERT_OK(commit->Commit(results, 1)); |
| |
| // check read result |
| ScanContextBuilder scan_context_builder(table_path); |
| scan_context_builder.AddOption(Options::FILE_FORMAT, file_format); |
| ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); |
| ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); |
| |
| ReadContextBuilder read_context_builder(table_path); |
| read_context_builder.AddOption(Options::FILE_FORMAT, file_format); |
| ASSERT_OK_AND_ASSIGN(auto read_context, read_context_builder.Finish()); |
| ASSERT_OK_AND_ASSIGN(auto table_read, TableRead::Create(std::move(read_context))); |
| |
| ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); |
| ASSERT_OK_AND_ASSIGN(auto batch_reader, table_read->CreateReader(result_plan->Splits())); |
| ASSERT_OK_AND_ASSIGN(auto read_result, ReadResultCollector::CollectResult(batch_reader.get())); |
| |
| arrow::FieldVector fields_with_row_kind = fields; |
| fields_with_row_kind.insert(fields_with_row_kind.begin(), |
| arrow::field("_VALUE_KIND", arrow::int8())); |
| auto data_type = arrow::struct_(fields_with_row_kind); |
| std::shared_ptr<arrow::Array> expected_array = |
| arrow::ipc::internal::json::ArrayFromJSON(data_type, R"([ |
| [0, [true, 2]], |
| [0, null], |
| [0, [false, 22]], |
| [0, null] |
| ])") |
| .ValueOrDie(); |
| auto expected = std::make_shared<arrow::ChunkedArray>(expected_array); |
| ASSERT_TRUE(expected->Equals(read_result)) << read_result->ToString(); |
| } |
| |
| TEST_P(WriteInteTest, TestWriteWithIOException) { |
| auto string_field = arrow::field("f0", arrow::utf8()); |
| auto int_field = arrow::field("f1", arrow::int32()); |
| auto int_field1 = arrow::field("f2", arrow::int32()); |
| auto double_field = arrow::field("f3", arrow::float64()); |
| auto struct_type = arrow::struct_({string_field, int_field, int_field1, double_field}); |
| auto schema = |
| arrow::schema(arrow::FieldVector({string_field, int_field, int_field1, double_field})); |
| auto file_format = GetParam(); |
| std::map<std::string, std::string> options = { |
| {Options::FILE_FORMAT, file_format}, |
| {Options::MANIFEST_FORMAT, "orc"}, |
| {Options::TARGET_FILE_SIZE, "1024"}, |
| {Options::FILE_SYSTEM, "local"}, |
| {Options::BUCKET, "2"}, |
| {Options::BUCKET_KEY, "f2"}, |
| }; |
| |
| std::vector<std::shared_ptr<CommitMessage>> commit_messages; |
| bool write_run_complete = false; |
| auto io_hook = IOHook::GetInstance(); |
| for (size_t i = 0; i < 200; i++) { |
| auto dir = UniqueTestDirectory::Create(); |
| ASSERT_TRUE(dir); |
| ::ArrowSchema arrow_schema; |
| ASSERT_TRUE(arrow::ExportSchema(*schema, &arrow_schema).ok()); |
| ASSERT_OK_AND_ASSIGN(std::string table_path, |
| CreateTestTable(dir->Str(), /*db_name=*/"foo", |
| /*table_name=*/"bar", &arrow_schema, |
| /*partition_keys=*/{"f1"}, |
| /*primary_keys=*/{}, options)); |
| ScopeGuard guard([&io_hook]() { io_hook->Clear(); }); |
| io_hook->Reset(i, IOHook::Mode::RETURN_ERROR); |
| |
| WriteContextBuilder context_builder(table_path, "commit_user_1"); |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<WriteContext> write_context, |
| context_builder.SetOptions(options).WithStreamingMode(true).Finish()); |
| |
| Result<std::unique_ptr<FileStoreWrite>> write = |
| FileStoreWrite::Create(std::move(write_context)); |
| CHECK_HOOK_STATUS(write.status(), i); |
| auto& file_store_write = write.value(); |
| |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatch> batch_0, |
| MakeRecordBatch({{"Alice", 10, 1, 11.1}}, {{"f1", "10"}}, 0)); |
| CHECK_HOOK_STATUS(file_store_write->Write(std::move(batch_0)), i); |
| Result<std::vector<std::shared_ptr<CommitMessage>>> results = |
| file_store_write->PrepareCommit(/*wait_compaction=*/false, 0); |
| CHECK_HOOK_STATUS(results.status(), i); |
| commit_messages = results.value(); |
| write_run_complete = true; |
| break; |
| } |
| ASSERT_TRUE(write_run_complete); |
| ASSERT_EQ(commit_messages.size(), 1); |
| } |
| |
| TEST_P(WriteInteTest, TestCommitWithIOException) { |
| auto string_field = arrow::field("f0", arrow::utf8()); |
| auto int_field = arrow::field("f1", arrow::int32()); |
| auto int_field1 = arrow::field("f2", arrow::int32()); |
| auto double_field = arrow::field("f3", arrow::float64()); |
| auto struct_type = arrow::struct_({string_field, int_field, int_field1, double_field}); |
| auto schema = |
| arrow::schema(arrow::FieldVector({string_field, int_field, int_field1, double_field})); |
| ::ArrowSchema arrow_schema; |
| ASSERT_TRUE(arrow::ExportSchema(*schema, &arrow_schema).ok()); |
| |
| auto file_format = GetParam(); |
| std::map<std::string, std::string> options = { |
| {Options::FILE_FORMAT, file_format}, |
| {Options::MANIFEST_FORMAT, "orc"}, |
| {Options::TARGET_FILE_SIZE, "1024"}, |
| {Options::FILE_SYSTEM, "local"}, |
| {Options::BUCKET, "2"}, |
| {Options::BUCKET_KEY, "f2"}, |
| }; |
| |
| auto dir = UniqueTestDirectory::Create(); |
| ASSERT_TRUE(dir); |
| |
| ASSERT_OK_AND_ASSIGN(std::string table_path, |
| CreateTestTable(dir->Str(), /*db_name=*/"foo", |
| /*table_name=*/"bar", &arrow_schema, |
| /*partition_keys=*/{"f1"}, |
| /*primary_keys=*/{}, options)); |
| // write data |
| WriteContextBuilder context_builder(table_path, "commit_user_1"); |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<WriteContext> write_context, |
| context_builder.SetOptions(options).WithStreamingMode(true).Finish()); |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<FileStoreWrite> file_store_write, |
| FileStoreWrite::Create(std::move(write_context))); |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatch> batch_0, |
| MakeRecordBatch({{"Alice", 10, 1, 11.1}}, {{"f1", "10"}}, 0)); |
| ASSERT_OK(file_store_write->Write(std::move(batch_0))); |
| ASSERT_OK_AND_ASSIGN(std::vector<std::shared_ptr<CommitMessage>> commit_messages, |
| file_store_write->PrepareCommit(/*wait_compaction=*/false, 0)); |
| |
| // commit data |
| bool commit_run_complete = false; |
| auto io_hook = IOHook::GetInstance(); |
| for (size_t i = 0; i < 400; i++) { |
| auto tmp_dir = UniqueTestDirectory::Create(); |
| ASSERT_TRUE(TestUtil::CopyDirectory(table_path, tmp_dir->Str())); |
| std::string tmp_table_path = tmp_dir->Str(); |
| ScopeGuard guard([&io_hook]() { io_hook->Clear(); }); |
| io_hook->Reset(i, IOHook::Mode::RETURN_ERROR); |
| CommitContextBuilder commit_context_builder(tmp_table_path, "commit_user_1"); |
| ASSERT_OK_AND_ASSIGN( |
| std::unique_ptr<CommitContext> commit_context, |
| commit_context_builder.AddOption(Options::MANIFEST_TARGET_FILE_SIZE, "8mb") |
| .AddOption(Options::FILE_SYSTEM, "local") |
| .IgnoreEmptyCommit(false) |
| .Finish()); |
| auto commit = FileStoreCommit::Create(std::move(commit_context)); |
| CHECK_HOOK_STATUS(commit.status(), i); |
| Result<int32_t> committed = commit.value()->FilterAndCommit({{0, commit_messages}}); |
| CHECK_HOOK_STATUS(committed.status(), i); |
| commit_run_complete = true; |
| // when commit complete, check snapshot exist |
| io_hook->Clear(); |
| std::vector<int64_t> actual_snapshots; |
| ASSERT_OK(FileUtils::ListVersionedFiles( |
| file_system_, PathUtil::JoinPath(tmp_table_path, "snapshot"), |
| SnapshotManager::SNAPSHOT_PREFIX, &actual_snapshots)); |
| std::vector<int64_t> expected_snapshots = {1}; |
| ASSERT_EQ(actual_snapshots, expected_snapshots); |
| break; |
| } |
| ASSERT_TRUE(commit_run_complete); |
| } |
| |
| TEST_P(WriteInteTest, TestWriteMemoryUse) { |
| auto string_field = arrow::field("f0", arrow::utf8()); |
| auto int_field = arrow::field("f1", arrow::int32()); |
| auto int_field1 = arrow::field("f2", arrow::int32()); |
| auto double_field = arrow::field("f3", arrow::float64()); |
| auto struct_type = arrow::struct_({string_field, int_field, int_field1, double_field}); |
| auto schema = |
| arrow::schema(arrow::FieldVector({string_field, int_field, int_field1, double_field})); |
| auto file_format = GetParam(); |
| std::map<std::string, std::string> options = { |
| {Options::FILE_FORMAT, file_format}, |
| {Options::MANIFEST_FORMAT, "orc"}, |
| {Options::TARGET_FILE_SIZE, "1024"}, |
| {Options::FILE_SYSTEM, "local"}, |
| {Options::BUCKET, "2"}, |
| {Options::BUCKET_KEY, "f2"}, |
| }; |
| |
| std::vector<std::shared_ptr<CommitMessage>> commit_messages; |
| auto dir = UniqueTestDirectory::Create(); |
| ASSERT_TRUE(dir); |
| ::ArrowSchema arrow_schema; |
| ASSERT_TRUE(arrow::ExportSchema(*schema, &arrow_schema).ok()); |
| ASSERT_OK_AND_ASSIGN(std::string table_path, |
| CreateTestTable(dir->Str(), /*db_name=*/"foo", |
| /*table_name=*/"bar", &arrow_schema, |
| /*partition_keys=*/{"f1"}, |
| /*primary_keys=*/{}, options)); |
| std::shared_ptr<MemoryPool> write_pool = GetMemoryPool(); |
| { |
| WriteContextBuilder context_builder(table_path, "commit_user_1"); |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<WriteContext> write_context, |
| context_builder.SetOptions(options) |
| .WithMemoryPool(write_pool) |
| .WithStreamingMode(true) |
| .Finish()); |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<FileStoreWrite> file_store_write, |
| FileStoreWrite::Create(std::move(write_context))); |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatch> batch_0, |
| MakeRecordBatch({{"Alice", 10, 1, 11.1}}, {{"f1", "10"}}, 0)); |
| ASSERT_OK(file_store_write->Write(std::move(batch_0))); |
| ASSERT_OK_AND_ASSIGN(std::vector<std::shared_ptr<CommitMessage>> commit_messages, |
| file_store_write->PrepareCommit(/*wait_compaction=*/false, 0)); |
| ASSERT_EQ(commit_messages.size(), 1); |
| } |
| // check all memory is released |
| ASSERT_GT(write_pool->MaxMemoryUsage(), 0); |
| ASSERT_EQ(write_pool->CurrentUsage(), 0); |
| } |
| |
| TEST_P(WriteInteTest, TestAppendTableWithAllNull) { |
| if (GetParam() == "lance") { |
| // lance do not support map |
| return; |
| } |
| auto dir = UniqueTestDirectory::Create(); |
| arrow::FieldVector fields = { |
| arrow::field("f0", arrow::boolean()), |
| arrow::field("f1", arrow::int8()), |
| arrow::field("f2", arrow::int16()), |
| arrow::field("f3", arrow::int32()), |
| arrow::field("f4", arrow::int64()), |
| arrow::field("f5", arrow::float32()), |
| arrow::field("f6", arrow::float64()), |
| arrow::field("f7", arrow::utf8()), |
| arrow::field("f8", arrow::binary()), |
| arrow::field("f9", arrow::map(arrow::int8(), arrow::int16())), |
| arrow::field("f10", arrow::list(arrow::float32())), |
| arrow::field("f11", arrow::struct_({arrow::field("f0", arrow::boolean()), |
| arrow::field("f1", arrow::int64())})), |
| arrow::field("f12", arrow::timestamp(arrow::TimeUnit::NANO)), |
| arrow::field("f13", arrow::date32()), |
| arrow::field("f14", arrow::decimal128(2, 2)), |
| arrow::field("f15", arrow::decimal128(30, 2))}; |
| auto schema = arrow::schema(fields); |
| |
| auto file_format = GetParam(); |
| std::map<std::string, std::string> options = { |
| {Options::MANIFEST_FORMAT, "orc"}, {Options::FILE_FORMAT, file_format}, |
| {Options::TARGET_FILE_SIZE, "1024"}, {Options::BUCKET, "-1"}, |
| {Options::FILE_SYSTEM, "local"}, |
| }; |
| |
| ASSERT_OK_AND_ASSIGN( |
| auto helper, TestHelper::Create(dir->Str(), schema, /*partition_keys=*/{}, |
| /*primary_keys=*/{}, options, /*is_streaming_mode=*/false)); |
| int64_t commit_identifier = 0; |
| |
| std::string data = |
| R"([[null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null]])"; |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatch> batch, |
| TestHelper::MakeRecordBatch(arrow::struct_(fields), data, |
| /*partition_map=*/{}, /*bucket=*/0, {})); |
| ASSERT_OK_AND_ASSIGN( |
| auto commit_msgs, |
| helper->WriteAndCommit(std::move(batch), commit_identifier++, std::nullopt)); |
| ASSERT_EQ(commit_msgs.size(), 1); |
| auto msg = std::dynamic_pointer_cast<CommitMessageImpl>(commit_msgs[0]); |
| ASSERT_EQ(msg->data_increment_.new_files_.size(), 1); |
| auto stats = msg->data_increment_.new_files_[0]->value_stats; |
| |
| // test compatible with java |
| ASSERT_EQ(stats.min_values_.HashCode(), 0xc7883013); |
| ASSERT_EQ(stats.max_values_.HashCode(), 0xc7883013); |
| if (GetParam() != "parquet" && GetParam() != "avro") { |
| ASSERT_EQ(stats.null_counts_.HashCode(), 0x5ddc482d); |
| } |
| } |
| |
| TEST_P(WriteInteTest, TestPkTablePostponeBucket) { |
| auto dir = UniqueTestDirectory::Create(); |
| ASSERT_TRUE(dir); |
| arrow::FieldVector fields = { |
| arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::int32()), |
| arrow::field("f2", arrow::int32()), arrow::field("f3", arrow::float64())}; |
| auto schema = arrow::schema(fields); |
| std::vector<std::string> primary_keys = {"f0", "f1"}; |
| auto file_format = GetParam(); |
| std::map<std::string, std::string> options = {{Options::MANIFEST_FORMAT, "orc"}, |
| {Options::FILE_FORMAT, file_format}, |
| {Options::TARGET_FILE_SIZE, "1024"}, |
| {Options::BUCKET, "-2"}, |
| {Options::FILE_SYSTEM, "local"}}; |
| ASSERT_OK_AND_ASSIGN(auto helper, TestHelper::Create(dir->Str(), schema, /*partition_keys=*/{}, |
| primary_keys, options, |
| /*is_streaming_mode=*/false)); |
| int64_t commit_identifier = 0; |
| ASSERT_OK_AND_ASSIGN(std::optional<std::shared_ptr<TableSchema>> table_schema, |
| helper->LatestSchema()); |
| ASSERT_TRUE(table_schema); |
| |
| std::string data = R"([ |
| ["Alice", 50, 0, 11.1], |
| ["Paul", 20, 1, 12.1], |
| ["Cathy", 30, 0, 13.1], |
| ["Emily", 5, 0, 14.1], |
| ["Cathy", 30, 0, 13.1] |
| ])"; |
| // test not specified bucket id in batch |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatch> batch, |
| TestHelper::MakeRecordBatch( |
| arrow::struct_(fields), data, |
| /*partition_map=*/{}, /*bucket=*/std::numeric_limits<int32_t>::min(), |
| {RecordBatch::RowKind::INSERT, RecordBatch::RowKind::INSERT, |
| RecordBatch::RowKind::INSERT, RecordBatch::RowKind::INSERT, |
| RecordBatch::RowKind::DELETE})); |
| |
| auto file_meta = std::make_shared<DataFileMeta>( |
| "data-xxx.xxx", /*file_size=*/543, |
| /*row_count=*/5, |
| /*min_key=*/BinaryRowGenerator::GenerateRow({std::string("Alice"), 50}, pool_.get()), |
| /*max_key=*/BinaryRowGenerator::GenerateRow({std::string("Cathy"), 30}, pool_.get()), |
| /*key_stats=*/ |
| BinaryRowGenerator::GenerateStats({NullType(), NullType()}, {NullType(), NullType()}, |
| {-1, -1}, pool_.get()), |
| /*value_stats=*/ |
| SimpleStats::EmptyStats(), |
| /*min_sequence_number=*/-1, /*max_sequence_number=*/-1, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1724090888706ll, 0), |
| /*delete_row_count=*/1, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, |
| /*first_row_id=*/std::nullopt, |
| /*write_cols=*/std::nullopt); |
| DataIncrement data_increment({file_meta}, {}, {}); |
| std::shared_ptr<CommitMessage> expected_commit_message = std::make_shared<CommitMessageImpl>( |
| /*partition_map=*/BinaryRow::EmptyRow(), /*bucket=*/-2, |
| /*total_bucket=*/-2, data_increment, CompactIncrement({}, {}, {})); |
| |
| std::vector<std::shared_ptr<CommitMessage>> expected_commit_messages = { |
| expected_commit_message}; |
| |
| ASSERT_OK_AND_ASSIGN( |
| auto commit_msgs, |
| helper->WriteAndCommit(std::move(batch), commit_identifier++, expected_commit_messages)); |
| ASSERT_OK_AND_ASSIGN(std::optional<Snapshot> snapshot1, helper->LatestSnapshot()); |
| ASSERT_TRUE(snapshot1); |
| ASSERT_EQ(1, snapshot1.value().Id()); |
| ASSERT_EQ(5, snapshot1.value().TotalRecordCount().value()); |
| ASSERT_EQ(5, snapshot1.value().DeltaRecordCount().value()); |
| |
| ASSERT_OK_AND_ASSIGN(std::vector<std::shared_ptr<Split>> data_splits, |
| helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt)); |
| ASSERT_EQ(data_splits.size(), 1); |
| |
| arrow::FieldVector fields_with_row_kind = fields; |
| fields_with_row_kind.insert(fields_with_row_kind.begin(), |
| arrow::field("_VALUE_KIND", arrow::int8())); |
| auto data_type = arrow::struct_(fields_with_row_kind); |
| std::string expected_data = R"([ |
| [0, "Alice", 50, 0, 11.1], |
| [0, "Paul", 20, 1, 12.1], |
| [0, "Cathy", 30, 0, 13.1], |
| [0, "Emily", 5, 0, 14.1], |
| [3, "Cathy", 30, 0, 13.1] |
| ])"; |
| |
| ASSERT_OK_AND_ASSIGN(bool success, |
| helper->ReadAndCheckResult(data_type, data_splits, expected_data)); |
| ASSERT_TRUE(success); |
| } |
| |
| TEST_F(WriteInteTest, TestBranchWrite) { |
| arrow::FieldVector fields = {arrow::field("dt", arrow::utf8()), |
| arrow::field("name", arrow::utf8()), |
| arrow::field("amount", arrow::int32())}; |
| std::string test_data_path = paimon::test::GetDataDir() + |
| "/orc/append_table_with_rt_branch.db/append_table_with_rt_branch/"; |
| auto dir = UniqueTestDirectory::Create(); |
| std::string table_path = dir->Str(); |
| ASSERT_TRUE(TestUtil::CopyDirectory(test_data_path, table_path)); |
| |
| std::map<std::string, std::string> options = {{Options::FILE_FORMAT, "orc"}, |
| {Options::MANIFEST_FORMAT, "orc"}, |
| {Options::FILE_SYSTEM, "local"}}; |
| WriteContextBuilder context_builder(table_path, "commit_user_1"); |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<WriteContext> write_context, |
| context_builder.SetOptions(options).WithBranch("rt").Finish()); |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<FileStoreWrite> file_store_write, |
| FileStoreWrite::Create(std::move(write_context))); |
| |
| // write to rt branch, pk table |
| std::string data_0 = |
| R"([["20240726", "watermelon", 23], |
| ["20240726", "orange", 12] |
| ])"; |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatch> batch_0, |
| TestHelper::MakeRecordBatch(arrow::struct_(fields), data_0, |
| /*partition_map=*/ |
| {{"dt", "20240726"}}, /*bucket=*/0, {})); |
| ASSERT_OK(file_store_write->Write(std::move(batch_0))); |
| ASSERT_OK_AND_ASSIGN(std::vector<std::shared_ptr<CommitMessage>> results_0, |
| file_store_write->PrepareCommit(/*wait_compaction=*/false, 0)); |
| ASSERT_EQ(results_0.size(), 1); |
| auto result_commit_msgs = std::dynamic_pointer_cast<CommitMessageImpl>(results_0[0]); |
| ASSERT_TRUE(result_commit_msgs); |
| auto file_meta_0 = std::make_shared<DataFileMeta>( |
| "data-xxx.xxx", /*file_size=*/543, /*row_count=*/2, |
| /*min_key=*/BinaryRowGenerator::GenerateRow({std::string("orange")}, pool_.get()), |
| /*max_key=*/BinaryRowGenerator::GenerateRow({std::string("watermelon")}, pool_.get()), |
| /*key_stats=*/ |
| BinaryRowGenerator::GenerateStats({std::string("orange")}, {std::string("watermelon")}, {0}, |
| pool_.get()), |
| BinaryRowGenerator::GenerateStats({std::string("20240726"), std::string("orange"), 12}, |
| {std::string("20240726"), std::string("watermelon"), 23}, |
| std::vector<int64_t>({0, 0, 0}), pool_.get()), |
| /*min_sequence_number=*/1, /*max_sequence_number=*/2, /*schema_id=*/1, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1724090888706ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, |
| /*first_row_id=*/std::nullopt, |
| /*write_cols=*/std::nullopt); |
| DataIncrement data_increment_0({file_meta_0}, {}, {}); |
| auto expected_commit_message_0 = std::make_shared<CommitMessageImpl>( |
| BinaryRowGenerator::GenerateRow({std::string("20240726")}, pool_.get()), /*bucket=*/0, |
| /*total_bucket=*/2, data_increment_0, CompactIncrement({}, {}, {})); |
| ASSERT_TRUE(expected_commit_message_0->TEST_Equal(*result_commit_msgs)); |
| |
| // write to main branch, append table |
| WriteContextBuilder context_builder1(table_path, "commit_user_1"); |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<WriteContext> write_context1, |
| context_builder1.SetOptions(options).Finish()); |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<FileStoreWrite> file_store_write1, |
| FileStoreWrite::Create(std::move(write_context1))); |
| std::string data_1 = |
| R"([["20240725", "watermelon", 3], |
| ["20240725", "orange", 2] |
| ])"; |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatch> batch_1, |
| TestHelper::MakeRecordBatch(arrow::struct_(fields), data_1, |
| /*partition_map=*/ |
| {{"dt", "20240725"}}, /*bucket=*/0, {})); |
| ASSERT_OK(file_store_write1->Write(std::move(batch_1))); |
| ASSERT_OK_AND_ASSIGN(std::vector<std::shared_ptr<CommitMessage>> results_1, |
| file_store_write1->PrepareCommit(/*wait_compaction=*/false, 0)); |
| ASSERT_EQ(results_1.size(), 1); |
| auto result_commit_msgs1 = std::dynamic_pointer_cast<CommitMessageImpl>(results_1[0]); |
| ASSERT_TRUE(result_commit_msgs1); |
| auto file_meta_1 = std::make_shared<DataFileMeta>( |
| "data-xxx.xxx", /*file_size=*/724, /*row_count=*/2, |
| /*min_key=*/BinaryRow::EmptyRow(), |
| /*max_key=*/BinaryRow::EmptyRow(), |
| /*key_stats=*/ |
| SimpleStats::EmptyStats(), |
| BinaryRowGenerator::GenerateStats({std::string("20240725"), std::string("orange"), 2}, |
| {std::string("20240725"), std::string("watermelon"), 3}, |
| std::vector<int64_t>({0, 0, 0}), pool_.get()), |
| /*min_sequence_number=*/0, /*max_sequence_number=*/1, /*schema_id=*/1, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1724090888706ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, |
| /*first_row_id=*/std::nullopt, |
| /*write_cols=*/std::nullopt); |
| DataIncrement data_increment_1({file_meta_1}, {}, {}); |
| auto expected_commit_message_1 = std::make_shared<CommitMessageImpl>( |
| BinaryRowGenerator::GenerateRow({std::string("20240725")}, pool_.get()), /*bucket=*/0, |
| /*total_bucket=*/-1, data_increment_1, CompactIncrement({}, {}, {})); |
| ASSERT_TRUE(expected_commit_message_1->TEST_Equal(*result_commit_msgs1)); |
| } |
| |
| TEST_P(WriteInteTest, TestDataEvolutionWrite) { |
| arrow::FieldVector fields = { |
| arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::int8()), |
| arrow::field("f2", arrow::int16()), arrow::field("f3", arrow::int32()), |
| arrow::field("f4", arrow::int64()), arrow::field("f5", arrow::float64())}; |
| auto typed_schema = arrow::schema(fields); |
| ::ArrowSchema c_schema; |
| ASSERT_TRUE(arrow::ExportSchema(*typed_schema, &c_schema).ok()); |
| |
| auto file_format = GetParam(); |
| auto dir = UniqueTestDirectory::Create(); |
| std::map<std::string, std::string> options = {{Options::FILE_FORMAT, file_format}, |
| {Options::MANIFEST_FORMAT, "orc"}, |
| {Options::ROW_TRACKING_ENABLED, "true"}, |
| {Options::DATA_EVOLUTION_ENABLED, "true"}, |
| {Options::FILE_SYSTEM, "local"}}; |
| |
| // create table |
| ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create(dir->Str(), {})); |
| ASSERT_OK(catalog->CreateDatabase("foo", {}, /*ignore_if_exists=*/false)); |
| ASSERT_OK(catalog->CreateTable(Identifier("foo", "bar"), &c_schema, /*partition_keys=*/{}, |
| /*primary_keys=*/{}, options, |
| /*ignore_if_exists=*/false)); |
| std::string table_path = PathUtil::JoinPath(dir->Str(), "foo.db/bar"); |
| std::string commit_user = "commit_user_1"; |
| auto write_array = [&](const std::vector<std::string>& write_cols, |
| const std::shared_ptr<arrow::Array>& write_array) |
| -> Result<std::vector<std::shared_ptr<CommitMessage>>> { |
| WriteContextBuilder builder(table_path, commit_user); |
| builder.WithWriteSchema(write_cols); |
| PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<WriteContext> write_context, builder.Finish()); |
| PAIMON_ASSIGN_OR_RAISE(auto file_store_write, |
| FileStoreWrite::Create(std::move(write_context))); |
| ArrowArray c_array; |
| PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportArray(*write_array, &c_array)); |
| auto record_batch = std::make_unique<RecordBatch>( |
| /*partition_map=*/std::map<std::string, std::string>(), /*bucket=*/0, |
| /*row_kinds=*/std::vector<RecordBatch::RowKind>(), &c_array); |
| PAIMON_RETURN_NOT_OK(file_store_write->Write(std::move(record_batch))); |
| PAIMON_ASSIGN_OR_RAISE(auto commit_msgs, |
| file_store_write->PrepareCommit( |
| /*wait_compaction=*/false, /*commit_identifier=*/0)); |
| PAIMON_RETURN_NOT_OK(file_store_write->Close()); |
| return commit_msgs; |
| }; |
| |
| auto check_meta = [&](const std::shared_ptr<CommitMessage>& result_commit_msg, |
| std::vector<std::shared_ptr<DataFileMeta>> expected_file_metas) { |
| auto result_commit_msg_impl = |
| std::dynamic_pointer_cast<CommitMessageImpl>(result_commit_msg); |
| for (auto& file_meta : expected_file_metas) { |
| file_meta = ReconstructDataFileMeta(file_meta); |
| } |
| DataIncrement data_increment(std::move(expected_file_metas), {}, {}); |
| auto expected_commit_message = std::make_shared<CommitMessageImpl>( |
| BinaryRow::EmptyRow(), /*bucket=*/0, |
| /*total_bucket=*/-1, data_increment, CompactIncrement({}, {}, {})); |
| ASSERT_TRUE(expected_commit_message->TEST_Equal(*result_commit_msg_impl)); |
| }; |
| |
| auto commit = [&](const std::vector<std::shared_ptr<CommitMessage>>& commit_msgs, |
| int64_t latest_snapshot_id, int64_t next_row_id) { |
| CommitContextBuilder builder(table_path, commit_user); |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<CommitContext> commit_context, builder.Finish()); |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<FileStoreCommit> file_store_commit, |
| FileStoreCommit::Create(std::move(commit_context))); |
| ASSERT_OK(file_store_commit->Commit(commit_msgs)); |
| |
| auto commit_impl = dynamic_cast<FileStoreCommitImpl*>(file_store_commit.get()); |
| ASSERT_OK_AND_ASSIGN(auto latest_snapshot, |
| commit_impl->snapshot_manager_->LatestSnapshotOfUser(commit_user)); |
| ASSERT_TRUE(latest_snapshot); |
| ASSERT_EQ(latest_snapshot.value().Id(), latest_snapshot_id); |
| ASSERT_EQ(latest_snapshot.value().NextRowId(), next_row_id); |
| }; |
| // first_row_id, write_cols, min_sequence_number, max_sequence_number |
| using MetaType = std::tuple<int64_t, std::optional<std::vector<std::string>>, int64_t, int64_t>; |
| auto check_committed_meta = [&](const std::vector<MetaType>& expected_meta) { |
| ScanContextBuilder builder(table_path); |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<ScanContext> scan_context, builder.Finish()); |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<TableScan> table_scan, |
| TableScan::Create(std::move(scan_context))); |
| ASSERT_OK_AND_ASSIGN(std::shared_ptr<Plan> plan, table_scan->CreatePlan()); |
| ASSERT_EQ(plan->Splits().size(), 1); |
| auto data_split = std::dynamic_pointer_cast<DataSplitImpl>(plan->Splits()[0]); |
| ASSERT_EQ(data_split->DataFiles().size(), expected_meta.size()); |
| for (size_t i = 0; i < expected_meta.size(); i++) { |
| const auto& [first_row_id, write_cols, min_sequence_number, max_sequence_number] = |
| expected_meta[i]; |
| const auto& result_meta = data_split->DataFiles()[i]; |
| ASSERT_EQ(result_meta->first_row_id.value(), first_row_id); |
| ASSERT_EQ(result_meta->write_cols, write_cols); |
| ASSERT_EQ(result_meta->min_sequence_number, min_sequence_number); |
| ASSERT_EQ(result_meta->max_sequence_number, max_sequence_number); |
| } |
| }; |
| // write field: f0, f1, f2, f3, f4, f5 |
| std::vector<std::string> write_cols1 = typed_schema->field_names(); |
| auto src_array1 = std::dynamic_pointer_cast<arrow::StructArray>( |
| arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([ |
| ["David", 5, 12, null, 2000, null], |
| ["Lucy", 2, null, 1, null, null] |
| ])") |
| .ValueOrDie()); |
| ASSERT_OK_AND_ASSIGN(auto commit_msgs1, write_array(write_cols1, src_array1)); |
| ASSERT_EQ(commit_msgs1.size(), 1); |
| // write_cols in data file meta is omitted for full write cols |
| auto file_meta1 = std::make_shared<DataFileMeta>( |
| "data-xxx.xxx", /*file_size=*/100, /*row_count=*/2, |
| /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), |
| /*key_stats=*/SimpleStats::EmptyStats(), |
| BinaryRowGenerator::GenerateStats({std::string("David"), static_cast<int8_t>(2), |
| static_cast<int16_t>(12), 1, 2000l, NullType()}, |
| {std::string("Lucy"), static_cast<int8_t>(5), |
| static_cast<int16_t>(12), 1, 2000l, NullType()}, |
| std::vector<int64_t>({0, 0, 1, 1, 1, 2}), pool_.get()), |
| /*min_sequence_number=*/0, /*max_sequence_number=*/1, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1724090888706ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, |
| /*first_row_id=*/std::nullopt, |
| /*write_cols=*/std::nullopt); |
| check_meta(commit_msgs1[0], {file_meta1}); |
| commit(commit_msgs1, /*latest_snapshot_id=*/1, /*next_row_id=*/2); |
| check_committed_meta({{0, std::nullopt, 1, 1}}); |
| |
| // write field: f0, f2, f4, f5 |
| std::vector<std::string> write_cols2 = {"f0", "f2", "f4", "f5"}; |
| auto src_array2 = std::dynamic_pointer_cast<arrow::StructArray>( |
| arrow::ipc::internal::json::ArrayFromJSON( |
| arrow::struct_({fields[0], fields[2], fields[4], fields[5]}), R"([ |
| ["Alice", 50, null, 11.1], |
| ["Paul", 20, 1, null], |
| ["Cathy", 30, 0, 13.1], |
| ["Emily", 5, 0, 14.1], |
| ["Cathy", 30, 0, null] |
| ])") |
| .ValueOrDie()); |
| ASSERT_OK_AND_ASSIGN(auto commit_msgs2, write_array(write_cols2, src_array2)); |
| ASSERT_EQ(commit_msgs2.size(), 1); |
| auto file_meta2 = std::make_shared<DataFileMeta>( |
| "data-xxx.xxx", /*file_size=*/100, /*row_count=*/5, |
| /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), |
| /*key_stats=*/SimpleStats::EmptyStats(), |
| BinaryRowGenerator::GenerateStats({std::string("Alice"), static_cast<int16_t>(5), 0l, 11.1}, |
| {std::string("Paul"), static_cast<int16_t>(50), 1l, 14.1}, |
| std::vector<int64_t>({0, 0, 1, 2}), pool_.get()), |
| /*min_sequence_number=*/0, /*max_sequence_number=*/4, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1724090888706ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, |
| /*first_row_id=*/std::nullopt, |
| /*write_cols=*/write_cols2); |
| check_meta(commit_msgs2[0], {file_meta2}); |
| commit(commit_msgs2, /*latest_snapshot_id=*/2, /*next_row_id=*/7); |
| check_committed_meta({{0, std::nullopt, 1, 1}, {2, write_cols2, 2, 2}}); |
| // write field: f0, according to file_meta1 |
| std::vector<std::string> write_cols3 = {"f0"}; |
| auto src_array3 = std::dynamic_pointer_cast<arrow::StructArray>( |
| arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({fields[0]}), R"([ |
| ["David3"], |
| ["Lucy3"] |
| ])") |
| .ValueOrDie()); |
| ASSERT_OK_AND_ASSIGN(auto commit_msgs3, write_array(write_cols3, src_array3)); |
| ASSERT_EQ(commit_msgs3.size(), 1); |
| auto file_meta3 = std::make_shared<DataFileMeta>( |
| "data-xxx.xxx", /*file_size=*/100, /*row_count=*/2, |
| /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), |
| /*key_stats=*/SimpleStats::EmptyStats(), |
| BinaryRowGenerator::GenerateStats({std::string("David3")}, {std::string("Lucy3")}, |
| std::vector<int64_t>({0}), pool_.get()), |
| /*min_sequence_number=*/0, /*max_sequence_number=*/1, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1724090888706ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, |
| /*first_row_id=*/std::nullopt, |
| /*write_cols=*/write_cols3); |
| check_meta(commit_msgs3[0], {file_meta3}); |
| auto commit_msg_impl = std::dynamic_pointer_cast<CommitMessageImpl>(commit_msgs3[0]); |
| ASSERT_TRUE(commit_msg_impl); |
| // according to file_meta1 |
| commit_msg_impl->data_increment_.new_files_[0]->AssignFirstRowId(0); |
| commit(commit_msgs3, /*latest_snapshot_id=*/3, /*next_row_id=*/7); |
| // in scan, DataFileMeta is sorted by fist row id and sequence number |
| check_committed_meta({ |
| {0, std::nullopt, 1, 1}, |
| {0, write_cols3, 3, 3}, |
| {2, write_cols2, 2, 2}, |
| }); |
| } |
| |
| TEST_P(WriteInteTest, TestAppendTableWriteWithBlobType) { |
| auto dir = UniqueTestDirectory::Create(); |
| arrow::FieldVector fields = {arrow::field("f0", arrow::utf8()), |
| arrow::field("f1", arrow::int32()), |
| BlobUtils::ToArrowField("blob", false)}; |
| auto schema = arrow::schema(fields); |
| |
| auto file_format = GetParam(); |
| std::map<std::string, std::string> options = { |
| {Options::MANIFEST_FORMAT, "orc"}, {Options::FILE_FORMAT, file_format}, |
| {Options::TARGET_FILE_SIZE, "700"}, {Options::BUCKET, "-1"}, |
| {Options::ROW_TRACKING_ENABLED, "true"}, {Options::DATA_EVOLUTION_ENABLED, "true"}, |
| {Options::FILE_SYSTEM, "local"}, {Options::BLOB_AS_DESCRIPTOR, "true"}}; |
| |
| ASSERT_OK_AND_ASSIGN( |
| auto helper, TestHelper::Create(dir->Str(), schema, /*partition_keys=*/{}, |
| /*primary_keys=*/{}, options, /*is_streaming_mode=*/true)); |
| int64_t commit_identifier = 0; |
| |
| auto generate_blob_array = [&](const std::vector<PAIMON_UNIQUE_PTR<Bytes>>& blob_descriptors) |
| -> std::shared_ptr<arrow::Array> { |
| arrow::StructBuilder struct_builder( |
| arrow::struct_(fields), arrow::default_memory_pool(), |
| {std::make_shared<arrow::StringBuilder>(), std::make_shared<arrow::Int32Builder>(), |
| std::make_shared<arrow::LargeBinaryBuilder>()}); |
| auto string_builder = dynamic_cast<arrow::StringBuilder*>(struct_builder.field_builder(0)); |
| auto int_builder = dynamic_cast<arrow::Int32Builder*>(struct_builder.field_builder(1)); |
| auto binary_builder = |
| dynamic_cast<arrow::LargeBinaryBuilder*>(struct_builder.field_builder(2)); |
| for (size_t i = 0; i < blob_descriptors.size(); ++i) { |
| EXPECT_TRUE(struct_builder.Append().ok()); |
| EXPECT_TRUE(string_builder->Append("str_" + std::to_string(i)).ok()); |
| if (i % 3 == 0) { |
| // test null |
| EXPECT_TRUE(int_builder->AppendNull().ok()); |
| } else { |
| EXPECT_TRUE(int_builder->Append(i).ok()); |
| } |
| EXPECT_TRUE( |
| binary_builder->Append(blob_descriptors[i]->data(), blob_descriptors[i]->size()) |
| .ok()); |
| } |
| std::shared_ptr<arrow::Array> array; |
| EXPECT_TRUE(struct_builder.Finish(&array).ok()); |
| return array; |
| }; |
| |
| std::vector<PAIMON_UNIQUE_PTR<Bytes>> blob_descriptors; |
| std::string file1 = paimon::test::GetDataDir() + "/avro/data/avro_with_null"; |
| ASSERT_OK_AND_ASSIGN(auto blob1, Blob::FromPath(file1)); |
| blob_descriptors.emplace_back(blob1->ToDescriptor(pool_)); |
| |
| std::string file2 = paimon::test::GetDataDir() + "/xxhash.data"; |
| ASSERT_OK_AND_ASSIGN(auto blob2, Blob::FromPath(file2, /*offset=*/0, /*length=*/91)); |
| blob_descriptors.emplace_back(blob2->ToDescriptor(pool_)); |
| ASSERT_OK_AND_ASSIGN(auto blob3, Blob::FromPath(file2, /*offset=*/92, /*length=*/85)); |
| blob_descriptors.emplace_back(blob3->ToDescriptor(pool_)); |
| ASSERT_OK_AND_ASSIGN(auto blob4, Blob::FromPath(file2, /*offset=*/300, /*length=*/3000)); |
| blob_descriptors.emplace_back(blob4->ToDescriptor(pool_)); |
| |
| auto array = generate_blob_array(blob_descriptors); |
| ::ArrowArray arrow_array; |
| ASSERT_TRUE(arrow::ExportArray(*array, &arrow_array).ok()); |
| RecordBatchBuilder batch_builder(&arrow_array); |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatch> batch, batch_builder.Finish()); |
| |
| auto file_meta1 = std::make_shared<DataFileMeta>( |
| "data-xxx.xxx", /*file_size=*/405, /*row_count=*/4, |
| /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), |
| /*key_stats=*/SimpleStats::EmptyStats(), |
| BinaryRowGenerator::GenerateStats({std::string("str_0"), 1}, {std::string("str_3"), 2}, |
| std::vector<int64_t>({0, 2}), pool_.get()), |
| /*min_sequence_number=*/1, /*max_sequence_number=*/1, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1724090888706ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, /*first_row_id=*/0, |
| /*write_cols=*/std::vector<std::string>({"f0", "f1"})); |
| file_meta1 = ReconstructDataFileMeta(file_meta1); |
| auto file_meta2 = std::make_shared<DataFileMeta>( |
| "data-xxx.blob", /*file_size=*/764, /*row_count=*/3, |
| /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), |
| /*key_stats=*/SimpleStats::EmptyStats(), |
| BinaryRowGenerator::GenerateStats({NullType()}, {NullType()}, std::vector<int64_t>({0}), |
| pool_.get()), |
| /*min_sequence_number=*/1, /*max_sequence_number=*/1, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1724090888706ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, /*first_row_id=*/0, |
| /*write_cols=*/std::vector<std::string>({"blob"})); |
| auto file_meta3 = std::make_shared<DataFileMeta>( |
| "data-xxx.blob", /*file_size=*/3023, /*row_count=*/1, |
| /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), |
| /*key_stats=*/SimpleStats::EmptyStats(), |
| BinaryRowGenerator::GenerateStats({NullType()}, {NullType()}, std::vector<int64_t>({0}), |
| pool_.get()), |
| /*min_sequence_number=*/1, /*max_sequence_number=*/1, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1724090888706ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, /*first_row_id=*/3, |
| /*write_cols=*/std::vector<std::string>({"blob"})); |
| std::vector<std::shared_ptr<DataFileMeta>> expected_meta = {file_meta1, file_meta2, file_meta3}; |
| |
| // NOTE: Due to the write logic of C++ Paimon and Java Paimon is different, the first_row_id in |
| // the CommitMessage may be different. The first_row_id will be rewritten again during the |
| // commit process. |
| ASSERT_OK_AND_ASSIGN(auto commit_msgs, |
| helper->WriteAndCommit(std::move(batch), commit_identifier++, |
| /*expected_commit_messages=*/std::nullopt)); |
| ASSERT_EQ(commit_msgs.size(), 1); |
| ASSERT_OK_AND_ASSIGN(std::optional<Snapshot> snapshot, helper->LatestSnapshot()); |
| ASSERT_TRUE(snapshot); |
| ASSERT_EQ(1, snapshot.value().Id()); |
| ASSERT_EQ(8, snapshot.value().TotalRecordCount().value()); |
| ASSERT_EQ(8, snapshot.value().DeltaRecordCount().value()); |
| ASSERT_EQ(4, snapshot.value().NextRowId().value()); |
| |
| // check data file meta after commit |
| ASSERT_OK_AND_ASSIGN(std::vector<std::shared_ptr<Split>> data_splits, |
| helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt)); |
| ASSERT_EQ(data_splits.size(), 1); |
| auto data_split = std::dynamic_pointer_cast<DataSplitImpl>(data_splits[0]); |
| ASSERT_EQ(data_split->DataFiles().size(), 3); |
| for (size_t i = 0; i < expected_meta.size(); i++) { |
| ASSERT_TRUE(data_split->DataFiles()[i]->TEST_Equal(*expected_meta[i])); |
| } |
| } |
| |
| TEST_P(WriteInteTest, TestAppendTableWithDateFieldAsPartitionField) { |
| // test date field as partition field |
| auto dir = UniqueTestDirectory::Create(); |
| arrow::FieldVector fields = { |
| arrow::field("f0", arrow::date32()), |
| arrow::field("f1", arrow::int32()), |
| }; |
| auto schema = arrow::schema(fields); |
| |
| auto file_format = GetParam(); |
| std::map<std::string, std::string> options = {{Options::MANIFEST_FORMAT, "orc"}, |
| {Options::FILE_FORMAT, file_format}, |
| {Options::TARGET_FILE_SIZE, "1024"}, |
| {Options::BUCKET, "-1"}, |
| {Options::FILE_SYSTEM, "local"}}; |
| |
| ASSERT_OK_AND_ASSIGN( |
| auto helper, TestHelper::Create(dir->Str(), schema, /*partition_keys=*/{"f0"}, |
| /*primary_keys=*/{}, options, /*is_streaming_mode=*/false)); |
| |
| std::string data_1 = |
| R"([[2005, 11], |
| [2005, 12]])"; |
| ASSERT_OK_AND_ASSIGN( |
| std::unique_ptr<RecordBatch> batch_1, |
| TestHelper::MakeRecordBatch(arrow::struct_(fields), data_1, |
| /*partition_map=*/{{"f0", "2005"}}, /*bucket=*/0, {})); |
| |
| auto file_meta = std::make_shared<DataFileMeta>( |
| "data-xxx.xxx", /*file_size=*/543, /*row_count=*/2, |
| /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), |
| /*key_stats=*/SimpleStats::EmptyStats(), |
| BinaryRowGenerator::GenerateStats({2005, 11}, {2005, 12}, std::vector<int64_t>({0, 0}), |
| pool_.get()), |
| /*min_sequence_number=*/0, /*max_sequence_number=*/1, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1724090888706ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, |
| /*first_row_id=*/std::nullopt, |
| /*write_cols=*/std::nullopt); |
| file_meta = ReconstructDataFileMeta(file_meta); |
| DataIncrement data_increment({file_meta}, {}, {}); |
| std::shared_ptr<CommitMessage> expected_commit_message_1 = std::make_shared<CommitMessageImpl>( |
| BinaryRowGenerator::GenerateRow({2005}, pool_.get()), /*bucket=*/0, |
| /*total_bucket=*/-1, data_increment, CompactIncrement({}, {}, {})); |
| std::vector<std::shared_ptr<CommitMessage>> expected_commit_messages_1 = { |
| expected_commit_message_1}; |
| int64_t commit_identifier = 0; |
| ASSERT_OK_AND_ASSIGN(auto commit_msgs, |
| helper->WriteAndCommit(std::move(batch_1), commit_identifier++, |
| expected_commit_messages_1)); |
| CheckCreationTime(commit_msgs); |
| ASSERT_OK_AND_ASSIGN(std::optional<Snapshot> snapshot1, helper->LatestSnapshot()); |
| ASSERT_TRUE(snapshot1); |
| ASSERT_EQ(1, snapshot1.value().Id()); |
| ASSERT_EQ(2, snapshot1.value().TotalRecordCount().value()); |
| ASSERT_EQ(2, snapshot1.value().DeltaRecordCount().value()); |
| |
| arrow::FieldVector fields_with_row_kind = fields; |
| fields_with_row_kind.insert(fields_with_row_kind.begin(), |
| arrow::field("_VALUE_KIND", arrow::int8())); |
| auto data_type = arrow::struct_(fields_with_row_kind); |
| ASSERT_OK_AND_ASSIGN(std::vector<std::shared_ptr<Split>> data_splits_1, |
| helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt)); |
| ASSERT_EQ(data_splits_1.size(), 1); |
| std::string expected_data_1 = |
| R"([[0, 2005, 11], |
| [0, 2005, 12]])"; |
| ASSERT_OK_AND_ASSIGN(bool success, |
| helper->ReadAndCheckResult(data_type, data_splits_1, expected_data_1)); |
| ASSERT_TRUE(success); |
| } |
| |
| TEST_P(WriteInteTest, TestNullabilityCheck) { |
| auto dir = UniqueTestDirectory::Create(); |
| arrow::FieldVector fields = {arrow::field("f0", arrow::utf8(), /*nullable=*/true), |
| arrow::field("f1", arrow::int32(), /*nullable=*/false)}; |
| auto schema = arrow::schema(fields); |
| auto file_format = GetParam(); |
| std::map<std::string, std::string> options = { |
| {Options::FILE_FORMAT, StringUtils::ToUpperCase(file_format)}, |
| {Options::TARGET_FILE_SIZE, "1024"}, |
| {Options::BUCKET, "-1"}, |
| }; |
| ASSERT_OK_AND_ASSIGN( |
| auto helper, TestHelper::Create(dir->Str(), schema, /*partition_keys=*/{}, |
| /*primary_keys=*/{}, options, /*is_streaming_mode=*/false)); |
| int64_t commit_identifier = 0; |
| |
| // write invalid data |
| std::string data = R"([ |
| ["banana", 2], |
| ["dog", 1], |
| [null, 14], |
| ["mouse", null] |
| ])"; |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatch> batch, |
| TestHelper::MakeRecordBatch(arrow::struct_(fields), data, |
| /*partition_map=*/{}, /*bucket=*/0, {})); |
| ASSERT_NOK_WITH_MSG( |
| helper->WriteAndCommit(std::move(batch), commit_identifier++, |
| /*expected_commit_messages=*/std::nullopt), |
| "CheckNullabilityMatch failed, field f1 not nullable while data have null value"); |
| |
| // write valid data |
| data = R"([ |
| ["banana", 2], |
| ["dog", 1], |
| [null, 14], |
| ["mouse", 100] |
| ])"; |
| ASSERT_OK_AND_ASSIGN(batch, |
| TestHelper::MakeRecordBatch(arrow::struct_(fields), data, |
| /*partition_map=*/{}, /*bucket=*/0, {})); |
| ASSERT_OK_AND_ASSIGN(auto commit_msgs, |
| helper->WriteAndCommit(std::move(batch), commit_identifier++, |
| /*expected_commit_messages=*/std::nullopt)); |
| } |
| |
| } // namespace paimon::test |