| /* |
| * Copyright 2024-present Alibaba Inc. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include <cstdint> |
| #include <cstring> |
| #include <map> |
| #include <memory> |
| #include <optional> |
| #include <ostream> |
| #include <string> |
| #include <utility> |
| #include <variant> |
| #include <vector> |
| |
| #include "arrow/api.h" |
| #include "gtest/gtest.h" |
| #include "paimon/common/data/binary_row.h" |
| #include "paimon/common/data/data_define.h" |
| #include "paimon/common/utils/decimal_utils.h" |
| #include "paimon/core/io/data_file_meta.h" |
| #include "paimon/core/manifest/file_source.h" |
| #include "paimon/core/stats/simple_stats.h" |
| #include "paimon/core/table/source/data_split_impl.h" |
| #include "paimon/data/decimal.h" |
| #include "paimon/data/timestamp.h" |
| #include "paimon/defs.h" |
| #include "paimon/memory/bytes.h" |
| #include "paimon/memory/memory_pool.h" |
| #include "paimon/predicate/literal.h" |
| #include "paimon/predicate/predicate_builder.h" |
| #include "paimon/result.h" |
| #include "paimon/scan_context.h" |
| #include "paimon/status.h" |
| #include "paimon/table/source/plan.h" |
| #include "paimon/table/source/startup_mode.h" |
| #include "paimon/table/source/table_scan.h" |
| #include "paimon/testing/utils/binary_row_generator.h" |
| #include "paimon/testing/utils/testharness.h" |
| |
| namespace paimon::test { |
| class ScanInteTest : public testing::Test { |
| public: |
| std::vector<std::shared_ptr<DataSplitImpl>> CollectDataSplits( |
| const std::shared_ptr<Plan>& plan) const { |
| std::vector<std::shared_ptr<DataSplitImpl>> result_data_splits; |
| for (const auto& result : plan->Splits()) { |
| auto data_split = std::dynamic_pointer_cast<DataSplitImpl>(result); |
| EXPECT_TRUE(data_split); |
| result_data_splits.push_back(data_split); |
| } |
| return result_data_splits; |
| } |
| |
| void CheckResult(std::vector<std::shared_ptr<DataSplitImpl>> expected, |
| std::vector<std::shared_ptr<DataSplitImpl>> result) const { |
| ASSERT_EQ(result.size(), expected.size()); |
| for (size_t i = 0; i < result.size(); i++) { |
| ASSERT_EQ(*result[i], *expected[i]) << result[i]->ToString() << std::endl |
| << expected[i]->ToString(); |
| } |
| } |
| |
| void CheckStreamScanResult(TableScan* table_scan, |
| const std::vector<std::optional<int64_t>> expected_snapshot_ids, |
| const std::vector<std::vector<std::shared_ptr<DataSplitImpl>>>& |
| expected_data_splits) const { |
| size_t scan_id = 0; |
| while (true) { |
| ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); |
| if (scan_id == expected_snapshot_ids.size()) { |
| // no snapshot |
| ASSERT_EQ(std::nullopt, result_plan->SnapshotId()); |
| ASSERT_TRUE(result_plan->Splits().empty()); |
| return; |
| } |
| // check snapshot ids |
| ASSERT_EQ(result_plan->SnapshotId(), expected_snapshot_ids[scan_id]); |
| |
| std::vector<std::shared_ptr<DataSplitImpl>> result_data_splits; |
| for (const auto& result : result_plan->Splits()) { |
| auto data_split = std::dynamic_pointer_cast<DataSplitImpl>(result); |
| ASSERT_TRUE(data_split); |
| result_data_splits.push_back(data_split); |
| } |
| // check data splits |
| CheckResult(expected_data_splits[scan_id], result_data_splits); |
| scan_id++; |
| } |
| } |
| |
| private: |
| std::shared_ptr<MemoryPool> pool_ = GetDefaultPool(); |
| |
| std::shared_ptr<arrow::DataType> arrow_data_type_ = |
| arrow::struct_({arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::int32()), |
| arrow::field("f2", arrow::int32()), arrow::field("f3", arrow::float64())}); |
| |
| std::shared_ptr<DataFileMeta> meta_snapshot1_partition10_bucket0_ = |
| std::make_shared<DataFileMeta>( |
| "data-d41fd7d1-b3e4-4905-aad9-b20a780e90a2-0.orc", /*file_size=*/543, /*row_count=*/1, |
| /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), |
| /*key_stats=*/SimpleStats::EmptyStats(), |
| BinaryRowGenerator::GenerateStats({std::string("Alice"), 10, 1, 11.1}, |
| {std::string("Alice"), 10, 1, 11.1}, {0, 0, 0, 0}, |
| pool_.get()), |
| /*min_sequence_number=*/0, /*max_sequence_number=*/0, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1721643142435ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, |
| /*first_row_id=*/std::nullopt, |
| /*write_cols=*/std::nullopt); |
| |
| std::shared_ptr<DataFileMeta> meta_snapshot1_partition10_bucket1_ = |
| std::make_shared<DataFileMeta>( |
| "data-4e30d6c0-f109-4300-a010-4ba03047dd9d-0.orc", /*file_size=*/575, /*row_count=*/3, |
| /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), |
| /*key_stats=*/SimpleStats::EmptyStats(), |
| BinaryRowGenerator::GenerateStats({std::string("Bob"), 10, 0, 12.1}, |
| {std::string("Tony"), 10, 0, 14.1}, {0, 0, 0, 0}, |
| pool_.get()), |
| /*min_sequence_number=*/0, /*max_sequence_number=*/2, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1721643142456ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, |
| /*first_row_id=*/std::nullopt, |
| /*write_cols=*/std::nullopt); |
| |
| std::shared_ptr<DataFileMeta> meta_snapshot1_partition20_bucket0_ = |
| std::make_shared<DataFileMeta>( |
| "data-db2b44c0-0d73-449d-82a0-4075bd2cb6e3-0.orc", /*file_size=*/541, /*row_count=*/1, |
| /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), |
| /*key_stats=*/SimpleStats::EmptyStats(), |
| BinaryRowGenerator::GenerateStats({std::string("Lucy"), 20, 1, 14.1}, |
| {std::string("Lucy"), 20, 1, 14.1}, {0, 0, 0, 0}, |
| pool_.get()), |
| /*min_sequence_number=*/0, /*max_sequence_number=*/0, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1721643142472ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, |
| /*first_row_id=*/std::nullopt, |
| /*write_cols=*/std::nullopt); |
| |
| std::shared_ptr<DataFileMeta> meta_snapshot2_partition10_bucket1_ = |
| std::make_shared<DataFileMeta>( |
| "data-10b9eea8-241d-4e4b-8ab8-2a82d72d79a2-0.orc", /*file_size=*/589, /*row_count=*/3, |
| /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), |
| /*key_stats=*/SimpleStats::EmptyStats(), |
| BinaryRowGenerator::GenerateStats({std::string("Alex"), 10, 0, 12.1}, |
| {std::string("Emily"), 10, 0, 16.1}, {0, 0, 0, 0}, |
| pool_.get()), |
| /*min_sequence_number=*/3, /*max_sequence_number=*/5, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1721643267385ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, |
| /*first_row_id=*/std::nullopt, |
| /*write_cols=*/std::nullopt); |
| |
| std::shared_ptr<DataFileMeta> meta_snapshot2_partition20_bucket0_ = |
| std::make_shared<DataFileMeta>( |
| "data-b913a160-a4d1-4084-af2a-18333c35668e-0.orc", /*file_size=*/506, /*row_count=*/1, |
| /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), |
| /*key_stats=*/SimpleStats::EmptyStats(), |
| BinaryRowGenerator::GenerateStats({std::string("Paul"), 20, 1, NullType()}, |
| {std::string("Paul"), 20, 1, NullType()}, |
| {0, 0, 0, 1}, pool_.get()), |
| /*min_sequence_number=*/1, /*max_sequence_number=*/1, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1721643267404ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, |
| /*first_row_id=*/std::nullopt, |
| /*write_cols=*/std::nullopt); |
| |
| std::shared_ptr<DataFileMeta> meta_snapshot3_partition10_bucket1_ = |
| std::make_shared<DataFileMeta>( |
| "data-e2bb59ee-ae25-4e5b-9bcc-257250bc5fdd-0.orc", /*file_size=*/541, /*row_count=*/1, |
| /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), |
| /*key_stats=*/SimpleStats::EmptyStats(), |
| BinaryRowGenerator::GenerateStats({std::string("David"), 10, 0, 17.1}, |
| {std::string("David"), 10, 0, 17.1}, {0, 0, 0, 0}, |
| pool_.get()), |
| /*min_sequence_number=*/6, /*max_sequence_number=*/6, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1721643314161ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, |
| /*first_row_id=*/std::nullopt, |
| /*write_cols=*/std::nullopt); |
| |
| std::shared_ptr<DataFileMeta> meta_snapshot4_partition10_bucket1_ = |
| std::make_shared<DataFileMeta>( |
| "data-2d5ea1ea-77c1-47ff-bb87-19a509962a37-0.orc", /*file_size=*/538, /*row_count=*/1, |
| /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), |
| /*key_stats=*/SimpleStats::EmptyStats(), |
| BinaryRowGenerator::GenerateStats({std::string("Lily"), 10, 0, 17.1}, |
| {std::string("Lily"), 10, 0, 17.1}, {0, 0, 0, 0}, |
| pool_.get()), |
| /*min_sequence_number=*/7, /*max_sequence_number=*/7, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1721643834400ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, |
| /*first_row_id=*/std::nullopt, |
| /*write_cols=*/std::nullopt); |
| |
| std::shared_ptr<DataFileMeta> meta_snapshot5_partition10_bucket1_ = |
| std::make_shared<DataFileMeta>( |
| "data-b9e7c41f-66e8-4dad-b25a-e6e1963becc4-0.orc", /*file_size=*/640, /*row_count=*/8, |
| /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), |
| /*key_stats=*/SimpleStats::EmptyStats(), |
| BinaryRowGenerator::GenerateStats({std::string("Alex"), 10, 0, 12.1}, |
| {std::string("Tony"), 10, 0, 17.1}, {0, 0, 0, 0}, |
| pool_.get()), |
| /*min_sequence_number=*/0, /*max_sequence_number=*/7, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1721643834472ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Compact(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, |
| /*first_row_id=*/std::nullopt, |
| /*write_cols=*/std::nullopt); |
| }; |
| |
| TEST_F(ScanInteTest, TestScanAppendWithSnapshot1) { |
| std::string table_path = paimon::test::GetDataDir() + "orc/append_09.db/append_09"; |
| ScanContextBuilder context_builder(table_path); |
| context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "1"); |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<ScanContext> scan_context, context_builder.Finish()); |
| ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); |
| ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); |
| // check snapshot id |
| ASSERT_EQ(result_plan->SnapshotId().value(), 1); |
| |
| // check data splits |
| auto result_data_splits = CollectDataSplits(result_plan); |
| DataSplitImpl::Builder builder1( |
| BinaryRowGenerator::GenerateRow({10}, pool_.get()), |
| /*bucket=*/0, /*bucket_path=*/ |
| paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-0", |
| {meta_snapshot1_partition10_bucket0_}); |
| auto expected_data_split1 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder1.WithTotalBuckets(2) |
| .WithSnapshot(1) |
| .IsStreaming(false) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| |
| DataSplitImpl::Builder builder2( |
| BinaryRowGenerator::GenerateRow({10}, pool_.get()), |
| /*bucket=*/1, /*bucket_path=*/ |
| paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-1", |
| {meta_snapshot1_partition10_bucket1_}); |
| auto expected_data_split2 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder2.WithTotalBuckets(2) |
| .WithSnapshot(1) |
| .IsStreaming(false) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| |
| DataSplitImpl::Builder builder3( |
| BinaryRowGenerator::GenerateRow({20}, pool_.get()), |
| /*bucket=*/0, /*bucket_path=*/ |
| paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=20/bucket-0", |
| {meta_snapshot1_partition20_bucket0_}); |
| auto expected_data_split3 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder3.WithTotalBuckets(2) |
| .WithSnapshot(1) |
| .IsStreaming(false) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| |
| std::vector<std::shared_ptr<DataSplitImpl>> expected_data_splits = { |
| expected_data_split1, expected_data_split2, expected_data_split3}; |
| CheckResult(expected_data_splits, result_data_splits); |
| } |
| |
| TEST_F(ScanInteTest, TestScanAppendWithSnapshot3) { |
| std::string table_path = paimon::test::GetDataDir() + "orc/append_09.db/append_09"; |
| ScanContextBuilder context_builder(table_path); |
| context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "3"); |
| ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); |
| ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); |
| ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); |
| // check snapshot id |
| ASSERT_EQ(result_plan->SnapshotId().value(), 3); |
| |
| // check data splits |
| auto result_data_splits = CollectDataSplits(result_plan); |
| DataSplitImpl::Builder builder1( |
| BinaryRowGenerator::GenerateRow({10}, pool_.get()), |
| /*bucket=*/0, /*bucket_path=*/ |
| paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-0", |
| {meta_snapshot1_partition10_bucket0_}); |
| auto expected_data_split1 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder1.WithTotalBuckets(2) |
| .WithSnapshot(3) |
| .IsStreaming(false) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| |
| DataSplitImpl::Builder builder2( |
| BinaryRowGenerator::GenerateRow({10}, pool_.get()), /*bucket=*/1, /*bucket_path=*/ |
| paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-1", |
| {meta_snapshot1_partition10_bucket1_, meta_snapshot2_partition10_bucket1_, |
| meta_snapshot3_partition10_bucket1_}); |
| auto expected_data_split2 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder2.WithTotalBuckets(2) |
| .WithSnapshot(3) |
| .IsStreaming(false) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| |
| DataSplitImpl::Builder builder3( |
| BinaryRowGenerator::GenerateRow({20}, pool_.get()), /*bucket=*/0, /*bucket_path=*/ |
| paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=20/bucket-0", |
| {meta_snapshot1_partition20_bucket0_, meta_snapshot2_partition20_bucket0_}); |
| auto expected_data_split3 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder3.WithTotalBuckets(2) |
| .WithSnapshot(3) |
| .IsStreaming(false) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| |
| std::vector<std::shared_ptr<DataSplitImpl>> expected_data_splits = { |
| expected_data_split1, expected_data_split2, expected_data_split3}; |
| CheckResult(expected_data_splits, result_data_splits); |
| } |
| |
| TEST_F(ScanInteTest, TestScanInvalidSnapshot) { |
| std::string table_path = paimon::test::GetDataDir() + "orc/append_09.db/append_09"; |
| ScanContextBuilder context_builder(table_path); |
| context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "100"); |
| ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); |
| ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); |
| ASSERT_NOK_WITH_MSG( |
| table_scan->CreatePlan(), |
| "The specified scan snapshotId 100 is out of available snapshotId range [1, 5]."); |
| } |
| |
| TEST_F(ScanInteTest, TestBatchScanMultipleTimes) { |
| std::string table_path = paimon::test::GetDataDir() + "orc/append_09.db/append_09"; |
| ScanContextBuilder context_builder(table_path); |
| context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "1"); |
| ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); |
| ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); |
| ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); |
| // batch scan multiple |
| ASSERT_NOK_WITH_MSG(table_scan->CreatePlan(), "end of scan"); |
| } |
| |
| TEST_F(ScanInteTest, TestScanAppendWithSnapshot3WithSplitTargetSize) { |
| std::string table_path = paimon::test::GetDataDir() + "orc/append_09.db/append_09"; |
| ScanContextBuilder context_builder(table_path); |
| context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "3") |
| .AddOption(Options::SOURCE_SPLIT_OPEN_FILE_COST, "1024") |
| .AddOption(Options::SOURCE_SPLIT_TARGET_SIZE, "2048"); |
| // open cost = 1024, and split target size is 2048, indicates at most 2 files in a split |
| ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); |
| ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); |
| ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); |
| // check snapshot id |
| ASSERT_EQ(result_plan->SnapshotId().value(), 3); |
| |
| // check data splits |
| auto result_data_splits = CollectDataSplits(result_plan); |
| DataSplitImpl::Builder builder1( |
| BinaryRowGenerator::GenerateRow({10}, pool_.get()), |
| /*bucket=*/0, /*bucket_path=*/ |
| paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-0", |
| {meta_snapshot1_partition10_bucket0_}); |
| auto expected_data_split1 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder1.WithTotalBuckets(2) |
| .WithSnapshot(3) |
| .IsStreaming(false) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| |
| DataSplitImpl::Builder builder2( |
| BinaryRowGenerator::GenerateRow({10}, pool_.get()), /*bucket=*/1, /*bucket_path=*/ |
| paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-1", |
| {meta_snapshot1_partition10_bucket1_, meta_snapshot2_partition10_bucket1_}); |
| auto expected_data_split2 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder2.WithTotalBuckets(2) |
| .WithSnapshot(3) |
| .IsStreaming(false) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| |
| DataSplitImpl::Builder builder3( |
| BinaryRowGenerator::GenerateRow({10}, pool_.get()), |
| /*bucket=*/1, /*bucket_path=*/ |
| paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-1", |
| {meta_snapshot3_partition10_bucket1_}); |
| auto expected_data_split3 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder3.WithTotalBuckets(2) |
| .WithSnapshot(3) |
| .IsStreaming(false) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| |
| DataSplitImpl::Builder builder4( |
| BinaryRowGenerator::GenerateRow({20}, pool_.get()), /*bucket=*/0, /*bucket_path=*/ |
| paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=20/bucket-0", |
| {meta_snapshot1_partition20_bucket0_, meta_snapshot2_partition20_bucket0_}); |
| auto expected_data_split4 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder4.WithTotalBuckets(2) |
| .WithSnapshot(3) |
| .IsStreaming(false) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| |
| std::vector<std::shared_ptr<DataSplitImpl>> expected_data_splits = { |
| expected_data_split1, expected_data_split2, expected_data_split3, expected_data_split4}; |
| CheckResult(expected_data_splits, result_data_splits); |
| } |
| |
| TEST_F(ScanInteTest, TestScanAppendWithSnapshot3WithRowCountLimit) { |
| std::string table_path = paimon::test::GetDataDir() + "orc/append_09.db/append_09"; |
| ScanContextBuilder context_builder(table_path); |
| context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "3").SetLimit(3); |
| ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); |
| ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); |
| ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); |
| // check snapshot id |
| ASSERT_EQ(result_plan->SnapshotId().value(), 3); |
| |
| // check data splits |
| auto result_data_splits = CollectDataSplits(result_plan); |
| // since row limit is set to 3, we only return partition10, bucket0 and partition10, bucket1 in |
| // plan (without partition20, bucket0) |
| DataSplitImpl::Builder builder1( |
| BinaryRowGenerator::GenerateRow({10}, pool_.get()), |
| /*bucket=*/0, /*bucket_path=*/ |
| paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-0", |
| {meta_snapshot1_partition10_bucket0_}); |
| auto expected_data_split1 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder1.WithTotalBuckets(2) |
| .WithSnapshot(3) |
| .IsStreaming(false) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| |
| DataSplitImpl::Builder builder3( |
| BinaryRowGenerator::GenerateRow({10}, pool_.get()), |
| /*bucket=*/1, /*bucket_path=*/ |
| paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-1", |
| {meta_snapshot1_partition10_bucket1_, meta_snapshot2_partition10_bucket1_, |
| meta_snapshot3_partition10_bucket1_}); |
| auto expected_data_split3 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder3.WithTotalBuckets(2) |
| .WithSnapshot(3) |
| .IsStreaming(false) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| |
| std::vector<std::shared_ptr<DataSplitImpl>> expected_data_splits = {expected_data_split1, |
| expected_data_split3}; |
| CheckResult(expected_data_splits, result_data_splits); |
| } |
| |
| TEST_F(ScanInteTest, TestScanAppendWithSnapshot3WithBucketFilter) { |
| std::string table_path = paimon::test::GetDataDir() + "orc/append_09.db/append_09"; |
| ScanContextBuilder context_builder(table_path); |
| context_builder.SetBucketFilter(0).AddOption(Options::SCAN_SNAPSHOT_ID, "3"); |
| ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); |
| ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); |
| ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); |
| // check snapshot id |
| ASSERT_EQ(result_plan->SnapshotId().value(), 3); |
| |
| // check data splits |
| auto result_data_splits = CollectDataSplits(result_plan); |
| DataSplitImpl::Builder builder1( |
| BinaryRowGenerator::GenerateRow({10}, pool_.get()), |
| /*bucket=*/0, /*bucket_path=*/ |
| paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-0", |
| {meta_snapshot1_partition10_bucket0_}); |
| auto expected_data_split1 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder1.WithTotalBuckets(2) |
| .WithSnapshot(3) |
| .IsStreaming(false) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| |
| DataSplitImpl::Builder builder3( |
| BinaryRowGenerator::GenerateRow({20}, pool_.get()), /*bucket=*/0, /*bucket_path=*/ |
| paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=20/bucket-0", |
| {meta_snapshot1_partition20_bucket0_, meta_snapshot2_partition20_bucket0_}); |
| auto expected_data_split3 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder3.WithTotalBuckets(2) |
| .WithSnapshot(3) |
| .IsStreaming(false) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| |
| std::vector<std::shared_ptr<DataSplitImpl>> expected_data_splits = {expected_data_split1, |
| expected_data_split3}; |
| CheckResult(expected_data_splits, result_data_splits); |
| } |
| |
| TEST_F(ScanInteTest, TestScanAppendWithStreamWithDefaultMode) { |
| // from snapshot is specified |
| std::string table_path = paimon::test::GetDataDir() + "orc/append_09.db/append_09"; |
| ScanContextBuilder context_builder(table_path); |
| context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "1").WithStreamingMode(true); |
| ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); |
| ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); |
| |
| DataSplitImpl::Builder builder1_1( |
| BinaryRowGenerator::GenerateRow({10}, pool_.get()), |
| /*bucket=*/0, /*bucket_path=*/ |
| paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-0", |
| {meta_snapshot1_partition10_bucket0_}); |
| auto expected_data_split1_1 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder1_1.WithTotalBuckets(2) |
| .WithSnapshot(1) |
| .IsStreaming(true) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| |
| DataSplitImpl::Builder builder1_2( |
| BinaryRowGenerator::GenerateRow({10}, pool_.get()), |
| /*bucket=*/1, /*bucket_path=*/ |
| paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-1", |
| {meta_snapshot1_partition10_bucket1_}); |
| auto expected_data_split1_2 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder1_2.WithTotalBuckets(2) |
| .WithSnapshot(1) |
| .IsStreaming(true) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| |
| DataSplitImpl::Builder builder1_3( |
| BinaryRowGenerator::GenerateRow({20}, pool_.get()), |
| /*bucket=*/0, /*bucket_path=*/ |
| paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=20/bucket-0", |
| {meta_snapshot1_partition20_bucket0_}); |
| auto expected_data_split1_3 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder1_3.WithTotalBuckets(2) |
| .WithSnapshot(1) |
| .IsStreaming(true) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| |
| // second scan |
| DataSplitImpl::Builder builder2_1( |
| BinaryRowGenerator::GenerateRow({10}, pool_.get()), |
| /*bucket=*/1, /*bucket_path=*/ |
| paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-1", |
| {meta_snapshot2_partition10_bucket1_}); |
| auto expected_data_split2_1 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder2_1.WithTotalBuckets(2) |
| .WithSnapshot(2) |
| .IsStreaming(true) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| |
| DataSplitImpl::Builder builder2_2( |
| BinaryRowGenerator::GenerateRow({20}, pool_.get()), |
| /*bucket=*/0, /*bucket_path=*/ |
| paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=20/bucket-0", |
| {meta_snapshot2_partition20_bucket0_}); |
| auto expected_data_split2_2 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder2_2.WithTotalBuckets(2) |
| .WithSnapshot(2) |
| .IsStreaming(true) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| |
| // third scan |
| DataSplitImpl::Builder builder3_1( |
| BinaryRowGenerator::GenerateRow({10}, pool_.get()), |
| /*bucket=*/1, /*bucket_path=*/ |
| paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-1", |
| {meta_snapshot3_partition10_bucket1_}); |
| auto expected_data_split3_1 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder3_1.WithTotalBuckets(2) |
| .WithSnapshot(3) |
| .IsStreaming(true) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| |
| // fourth scan |
| DataSplitImpl::Builder builder4_1( |
| BinaryRowGenerator::GenerateRow({10}, pool_.get()), |
| /*bucket=*/1, /*bucket_path=*/ |
| paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-1", |
| {meta_snapshot4_partition10_bucket1_}); |
| auto expected_data_split4_1 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder4_1.WithTotalBuckets(2) |
| .WithSnapshot(4) |
| .IsStreaming(true) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| |
| std::vector<std::vector<std::shared_ptr<DataSplitImpl>>> expected_data_splits = { |
| {}, |
| {expected_data_split1_1, expected_data_split1_2, expected_data_split1_3}, |
| {expected_data_split2_1, expected_data_split2_2}, |
| {expected_data_split3_1}, |
| {expected_data_split4_1}}; |
| |
| std::vector<std::optional<int64_t>> expected_snapshot_ids = {std::nullopt, 1, 2, 3, 4}; |
| CheckStreamScanResult(table_scan.get(), expected_snapshot_ids, expected_data_splits); |
| } |
| |
| TEST_F(ScanInteTest, TestScanAppendWithStreamOfLatestFullMode) { |
| std::string table_path = paimon::test::GetDataDir() + "orc/append_09.db/append_09"; |
| ScanContextBuilder context_builder(table_path); |
| context_builder.AddOption(Options::SCAN_MODE, StartupMode::LatestFull().ToString()) |
| .WithStreamingMode(true); |
| ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); |
| ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); |
| |
| DataSplitImpl::Builder builder1( |
| BinaryRowGenerator::GenerateRow({10}, pool_.get()), |
| /*bucket=*/0, /*bucket_path=*/ |
| paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-0", |
| {meta_snapshot1_partition10_bucket0_}); |
| auto expected_data_split1 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder1.WithTotalBuckets(2) |
| .WithSnapshot(5) |
| .IsStreaming(false) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| |
| DataSplitImpl::Builder builder2( |
| BinaryRowGenerator::GenerateRow({10}, pool_.get()), |
| /*bucket=*/1, /*bucket_path=*/ |
| paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-1", |
| {meta_snapshot5_partition10_bucket1_}); |
| auto expected_data_split2 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder2.WithTotalBuckets(2) |
| .WithSnapshot(5) |
| .IsStreaming(false) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| |
| DataSplitImpl::Builder builder3( |
| BinaryRowGenerator::GenerateRow({20}, pool_.get()), |
| /*bucket=*/0, /*bucket_path=*/ |
| paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=20/bucket-0", |
| {meta_snapshot1_partition20_bucket0_, meta_snapshot2_partition20_bucket0_}); |
| auto expected_data_split3 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder3.WithTotalBuckets(2) |
| .WithSnapshot(5) |
| .IsStreaming(false) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| |
| std::vector<std::vector<std::shared_ptr<DataSplitImpl>>> expected_data_splits = { |
| {expected_data_split1, expected_data_split2, expected_data_split3}}; |
| |
| std::vector<std::optional<int64_t>> expected_snapshot_ids = {5}; |
| CheckStreamScanResult(table_scan.get(), expected_snapshot_ids, expected_data_splits); |
| } |
| |
| TEST_F(ScanInteTest, TestScanAppendWithBatchScanOfLatestMode) { |
| std::string table_path = paimon::test::GetDataDir() + "orc/append_09.db/append_09"; |
| ScanContextBuilder context_builder(table_path); |
| context_builder.AddOption(Options::SCAN_MODE, StartupMode::Latest().ToString()); |
| ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); |
| ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); |
| ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); |
| // check snapshot id |
| ASSERT_EQ(result_plan->SnapshotId().value(), 5); |
| // check data splits |
| auto result_data_splits = CollectDataSplits(result_plan); |
| DataSplitImpl::Builder builder1( |
| BinaryRowGenerator::GenerateRow({10}, pool_.get()), |
| /*bucket=*/0, /*bucket_path=*/ |
| paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-0", |
| {meta_snapshot1_partition10_bucket0_}); |
| auto expected_data_split1 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder1.WithTotalBuckets(2) |
| .WithSnapshot(5) |
| .IsStreaming(false) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| |
| DataSplitImpl::Builder builder2( |
| BinaryRowGenerator::GenerateRow({10}, pool_.get()), |
| /*bucket=*/1, /*bucket_path=*/ |
| paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-1", |
| {meta_snapshot5_partition10_bucket1_}); |
| auto expected_data_split2 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder2.WithTotalBuckets(2) |
| .WithSnapshot(5) |
| .IsStreaming(false) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| |
| DataSplitImpl::Builder builder3( |
| BinaryRowGenerator::GenerateRow({20}, pool_.get()), |
| /*bucket=*/0, /*bucket_path=*/ |
| paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=20/bucket-0", |
| {meta_snapshot1_partition20_bucket0_, meta_snapshot2_partition20_bucket0_}); |
| auto expected_data_split3 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder3.WithTotalBuckets(2) |
| .WithSnapshot(5) |
| .IsStreaming(false) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| |
| std::vector<std::shared_ptr<DataSplitImpl>> expected_data_splits = { |
| expected_data_split1, expected_data_split2, expected_data_split3}; |
| CheckResult(expected_data_splits, result_data_splits); |
| } |
| |
| TEST_F(ScanInteTest, TestScanAppendWithStreamOfLatestMode) { |
| std::string table_path = paimon::test::GetDataDir() + "orc/append_09.db/append_09"; |
| |
| ScanContextBuilder context_builder(table_path); |
| context_builder.AddOption(Options::SCAN_MODE, StartupMode::Latest().ToString()) |
| .WithStreamingMode(true); |
| ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); |
| ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); |
| |
| std::vector<std::optional<int64_t>> expected_snapshot_ids = {}; |
| CheckStreamScanResult(table_scan.get(), expected_snapshot_ids, {}); |
| } |
| |
| TEST_F(ScanInteTest, TestScanAppendWithStreamOfFromSnapshotMode) { |
| std::string table_path = paimon::test::GetDataDir() + "orc/append_09.db/append_09"; |
| |
| ScanContextBuilder context_builder(table_path); |
| context_builder.AddOption(Options::SCAN_MODE, StartupMode::FromSnapshot().ToString()) |
| .AddOption(Options::SCAN_SNAPSHOT_ID, "2") |
| .WithStreamingMode(true); |
| ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); |
| ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); |
| |
| DataSplitImpl::Builder builder2_1( |
| BinaryRowGenerator::GenerateRow({10}, pool_.get()), |
| /*bucket=*/1, /*bucket_path=*/ |
| paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-1", |
| {meta_snapshot2_partition10_bucket1_}); |
| auto expected_data_split2_1 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder2_1.WithTotalBuckets(2) |
| .WithSnapshot(2) |
| .IsStreaming(true) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| |
| DataSplitImpl::Builder builder2_2( |
| BinaryRowGenerator::GenerateRow({20}, pool_.get()), |
| /*bucket=*/0, /*bucket_path=*/ |
| paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=20/bucket-0", |
| {meta_snapshot2_partition20_bucket0_}); |
| auto expected_data_split2_2 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder2_2.WithTotalBuckets(2) |
| .WithSnapshot(2) |
| .IsStreaming(true) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| |
| DataSplitImpl::Builder builder3_1( |
| BinaryRowGenerator::GenerateRow({10}, pool_.get()), |
| /*bucket=*/1, /*bucket_path=*/ |
| paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-1", |
| {meta_snapshot3_partition10_bucket1_}); |
| auto expected_data_split3_1 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder3_1.WithTotalBuckets(2) |
| .WithSnapshot(3) |
| .IsStreaming(true) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| |
| DataSplitImpl::Builder builder4_1( |
| BinaryRowGenerator::GenerateRow({10}, pool_.get()), |
| /*bucket=*/1, /*bucket_path=*/ |
| paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-1", |
| {meta_snapshot4_partition10_bucket1_}); |
| auto expected_data_split4_1 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder4_1.WithTotalBuckets(2) |
| .WithSnapshot(4) |
| .IsStreaming(true) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| |
| std::vector<std::vector<std::shared_ptr<DataSplitImpl>>> expected_data_splits = { |
| {}, |
| {expected_data_split2_1, expected_data_split2_2}, |
| {expected_data_split3_1}, |
| {expected_data_split4_1}}; |
| |
| std::vector<std::optional<int64_t>> expected_snapshot_ids = {std::nullopt, 2, 3, 4}; |
| CheckStreamScanResult(table_scan.get(), expected_snapshot_ids, expected_data_splits); |
| } |
| |
| TEST_F(ScanInteTest, TestScanAppendWithStreamOfFromSnapshotFullMode) { |
| std::string table_path = paimon::test::GetDataDir() + "orc/append_09.db/append_09"; |
| |
| ScanContextBuilder context_builder(table_path); |
| context_builder.AddOption(Options::SCAN_MODE, StartupMode::FromSnapshotFull().ToString()) |
| .AddOption(Options::SCAN_SNAPSHOT_ID, "2") |
| .WithStreamingMode(true); |
| ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); |
| ASSERT_TRUE(scan_context); |
| ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); |
| |
| DataSplitImpl::Builder builder1_1( |
| BinaryRowGenerator::GenerateRow({10}, pool_.get()), |
| /*bucket=*/0, /*bucket_path=*/ |
| paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-0", |
| {meta_snapshot1_partition10_bucket0_}); |
| auto expected_data_split1_1 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder1_1.WithTotalBuckets(2) |
| .WithSnapshot(2) |
| .IsStreaming(false) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| |
| DataSplitImpl::Builder builder1_2( |
| BinaryRowGenerator::GenerateRow({10}, pool_.get()), |
| /*bucket=*/1, /*bucket_path=*/ |
| paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-1", |
| {meta_snapshot1_partition10_bucket1_, meta_snapshot2_partition10_bucket1_}); |
| auto expected_data_split1_2 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder1_2.WithTotalBuckets(2) |
| .WithSnapshot(2) |
| .IsStreaming(false) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| |
| DataSplitImpl::Builder builder1_3( |
| BinaryRowGenerator::GenerateRow({20}, pool_.get()), |
| /*bucket=*/0, /*bucket_path=*/ |
| paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=20/bucket-0", |
| {meta_snapshot1_partition20_bucket0_, meta_snapshot2_partition20_bucket0_}); |
| auto expected_data_split1_3 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder1_3.WithTotalBuckets(2) |
| .WithSnapshot(2) |
| .IsStreaming(false) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| |
| DataSplitImpl::Builder builder3_1( |
| BinaryRowGenerator::GenerateRow({10}, pool_.get()), |
| /*bucket=*/1, /*bucket_path=*/ |
| paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-1", |
| {meta_snapshot3_partition10_bucket1_}); |
| auto expected_data_split3_1 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder3_1.WithTotalBuckets(2) |
| .WithSnapshot(3) |
| .IsStreaming(true) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| |
| DataSplitImpl::Builder builder4_1( |
| BinaryRowGenerator::GenerateRow({10}, pool_.get()), |
| /*bucket=*/1, /*bucket_path=*/ |
| paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-1", |
| {meta_snapshot4_partition10_bucket1_}); |
| auto expected_data_split4_1 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder4_1.WithTotalBuckets(2) |
| .WithSnapshot(4) |
| .IsStreaming(true) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| |
| std::vector<std::vector<std::shared_ptr<DataSplitImpl>>> expected_data_splits = { |
| {expected_data_split1_1, expected_data_split1_2, expected_data_split1_3}, |
| {expected_data_split3_1}, |
| {expected_data_split4_1}}; |
| |
| std::vector<std::optional<int64_t>> expected_snapshot_ids = {2, 3, 4}; |
| CheckStreamScanResult(table_scan.get(), expected_snapshot_ids, expected_data_splits); |
| } |
| |
| TEST_F(ScanInteTest, TestScanAppendWithInvalidOptions) { |
| std::string table_path = paimon::test::GetDataDir() + "orc/append_09.db/append_09"; |
| { |
| ScanContextBuilder context_builder(table_path); |
| context_builder.AddOption(Options::SCAN_MODE, StartupMode::FromSnapshot().ToString()) |
| .WithStreamingMode(true); |
| ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); |
| ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); |
| ASSERT_NOK_WITH_MSG( |
| table_scan->CreatePlan(), |
| "scan.snapshot-id or scan.tag-name must be set when startup mode is FROM_SNAPSHOT"); |
| } |
| { |
| ScanContextBuilder context_builder(table_path); |
| context_builder.AddOption(Options::BUCKET, "-2").WithStreamingMode(true); |
| ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); |
| ASSERT_NOK_WITH_MSG(TableScan::Create(std::move(scan_context)), |
| "do not support bucket=-2 in scan process"); |
| } |
| } |
| |
| TEST_F(ScanInteTest, TestScanAppendWithSnapshot1WithEqualPredicate) { |
| std::string table_path = paimon::test::GetDataDir() + "orc/append_09.db/append_09"; |
| |
| std::string val("Bob"); |
| auto predicate = |
| PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f0", FieldType::STRING, |
| Literal(FieldType::STRING, val.data(), val.size())); |
| |
| ScanContextBuilder context_builder(table_path); |
| context_builder.SetPredicate(predicate).AddOption(Options::SCAN_SNAPSHOT_ID, "1"); |
| ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); |
| ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); |
| ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); |
| // check snapshot id |
| ASSERT_EQ(result_plan->SnapshotId().value(), 1); |
| |
| // check data splits |
| auto result_data_splits = CollectDataSplits(result_plan); |
| DataSplitImpl::Builder builder( |
| BinaryRowGenerator::GenerateRow({10}, pool_.get()), |
| /*bucket=*/1, /*bucket_path=*/ |
| paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-1", |
| {meta_snapshot1_partition10_bucket1_}); |
| auto expected_data_split = std::dynamic_pointer_cast<DataSplitImpl>(builder.WithTotalBuckets(2) |
| .WithSnapshot(1) |
| .IsStreaming(false) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| |
| std::vector<std::shared_ptr<DataSplitImpl>> expected_data_splits = {expected_data_split}; |
| CheckResult(expected_data_splits, result_data_splits); |
| } |
| |
| TEST_F(ScanInteTest, TestScanAppendWithStreamWithAndPredicate) { |
| // from snapshot is specified |
| std::string table_path = paimon::test::GetDataDir() + "orc/append_09.db/append_09"; |
| |
| auto greater_than_predicate = PredicateBuilder::GreaterThan( |
| /*field_index=*/3, /*field_name=*/"f3", FieldType::DOUBLE, Literal(13.1)); |
| std::string val("Paul"); |
| auto less_than_predicate = PredicateBuilder::LessThan( |
| /*field_index=*/0, /*field_name=*/"f0", FieldType::STRING, |
| Literal(FieldType::STRING, val.data(), val.size())); |
| val = "David"; |
| auto not_in_predicate = |
| PredicateBuilder::NotIn(/*field_index=*/0, /*field_name=*/"f0", FieldType::STRING, |
| {Literal(FieldType::STRING, val.data(), val.size())}); |
| auto not_equal = PredicateBuilder::NotEqual(/*field_index=*/1, /*field_name=*/"f1", |
| FieldType::INT, {Literal(20)}); |
| ASSERT_OK_AND_ASSIGN(auto predicate, |
| PredicateBuilder::And({greater_than_predicate, less_than_predicate, |
| not_in_predicate, not_equal})); |
| |
| ScanContextBuilder context_builder(table_path); |
| context_builder.SetPredicate(predicate) |
| .AddOption(Options::SCAN_SNAPSHOT_ID, "1") |
| .WithStreamingMode(true); |
| ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); |
| ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); |
| |
| DataSplitImpl::Builder builder1_2( |
| BinaryRowGenerator::GenerateRow({10}, pool_.get()), |
| /*bucket=*/1, /*bucket_path=*/ |
| paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-1", |
| {meta_snapshot1_partition10_bucket1_}); |
| auto expected_data_split1_2 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder1_2.WithTotalBuckets(2) |
| .WithSnapshot(1) |
| .IsStreaming(true) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| |
| // second scan |
| DataSplitImpl::Builder builder2_1( |
| BinaryRowGenerator::GenerateRow({10}, pool_.get()), |
| /*bucket=*/1, /*bucket_path=*/ |
| paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-1", |
| {meta_snapshot2_partition10_bucket1_}); |
| auto expected_data_split2_1 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder2_1.WithTotalBuckets(2) |
| .WithSnapshot(2) |
| .IsStreaming(true) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| |
| // files in third scan is all filtered |
| // fourth scan |
| DataSplitImpl::Builder builder4_1( |
| BinaryRowGenerator::GenerateRow({10}, pool_.get()), |
| /*bucket=*/1, /*bucket_path=*/ |
| paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-1", |
| {meta_snapshot4_partition10_bucket1_}); |
| auto expected_data_split4_1 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder4_1.WithTotalBuckets(2) |
| .WithSnapshot(4) |
| .IsStreaming(true) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| |
| std::vector<std::vector<std::shared_ptr<DataSplitImpl>>> expected_data_splits = { |
| {}, {expected_data_split1_2}, {expected_data_split2_1}, {}, {expected_data_split4_1}}; |
| |
| std::vector<std::optional<int64_t>> expected_snapshot_ids = {std::nullopt, 1, 2, 3, 4}; |
| CheckStreamScanResult(table_scan.get(), expected_snapshot_ids, expected_data_splits); |
| } |
| |
| TEST_F(ScanInteTest, TestScanAppendWithSnapshot1WithPartitionFilter) { |
| std::string table_path = paimon::test::GetDataDir() + "orc/append_09.db/append_09"; |
| |
| std::map<std::string, std::string> partition_keys; |
| partition_keys["f1"] = "10"; |
| ScanContextBuilder context_builder(table_path); |
| context_builder.SetPartitionFilter({partition_keys}).AddOption(Options::SCAN_SNAPSHOT_ID, "1"); |
| ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); |
| ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); |
| ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); |
| // check snapshot id |
| ASSERT_EQ(result_plan->SnapshotId().value(), 1); |
| |
| // check data splits |
| auto result_data_splits = CollectDataSplits(result_plan); |
| DataSplitImpl::Builder builder1( |
| BinaryRowGenerator::GenerateRow({10}, pool_.get()), |
| /*bucket=*/0, /*bucket_path=*/ |
| paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-0", |
| {meta_snapshot1_partition10_bucket0_}); |
| auto expected_data_split1 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder1.WithTotalBuckets(2) |
| .WithSnapshot(1) |
| .IsStreaming(false) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| |
| DataSplitImpl::Builder builder2( |
| BinaryRowGenerator::GenerateRow({10}, pool_.get()), |
| /*bucket=*/1, /*bucket_path=*/ |
| paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-1", |
| {meta_snapshot1_partition10_bucket1_}); |
| auto expected_data_split2 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder2.WithTotalBuckets(2) |
| .WithSnapshot(1) |
| .IsStreaming(false) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| |
| std::vector<std::shared_ptr<DataSplitImpl>> expected_data_splits = {expected_data_split1, |
| expected_data_split2}; |
| CheckResult(expected_data_splits, result_data_splits); |
| } |
| |
| TEST_F(ScanInteTest, TestScanAppendWithSnapshot1WithInvalidPartitionFilter) { |
| std::string table_path = paimon::test::GetDataDir() + "orc/append_09.db/append_09"; |
| |
| std::map<std::string, std::string> partition_keys; |
| partition_keys["invalid_partition_key"] = "10"; |
| ScanContextBuilder context_builder(table_path); |
| context_builder.SetPartitionFilter({partition_keys}).AddOption(Options::SCAN_SNAPSHOT_ID, "1"); |
| ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); |
| ASSERT_NOK_WITH_MSG(TableScan::Create(std::move(scan_context)), |
| "field invalid_partition_key does not exist in partition keys"); |
| } |
| |
| TEST_F(ScanInteTest, TestScanAppendWithSnapshot1WithPartitionFilterAndPredicateFilter) { |
| std::string table_path = paimon::test::GetDataDir() + "orc/append_09.db/append_09"; |
| |
| // set predicate filter, f1 = 20 |
| auto predicate = PredicateBuilder::Equal(/*field_index=*/1, /*field_name=*/"f1", FieldType::INT, |
| Literal(20)); |
| // set partition filter, f1 = 10 |
| std::map<std::string, std::string> partition_keys; |
| partition_keys["f1"] = "10"; |
| |
| ScanContextBuilder context_builder(table_path); |
| context_builder.SetPredicate(predicate) |
| .SetPartitionFilter({partition_keys}) |
| .AddOption(Options::SCAN_SNAPSHOT_ID, "1"); |
| ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); |
| ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); |
| ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); |
| // check snapshot id |
| ASSERT_EQ(result_plan->SnapshotId().value(), 1); |
| |
| // check data splits |
| auto result_data_splits = CollectDataSplits(result_plan); |
| DataSplitImpl::Builder builder1( |
| BinaryRowGenerator::GenerateRow({10}, pool_.get()), |
| /*bucket=*/0, /*bucket_path=*/ |
| paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-0", |
| {meta_snapshot1_partition10_bucket0_}); |
| auto expected_data_split1 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder1.WithTotalBuckets(2) |
| .WithSnapshot(1) |
| .IsStreaming(false) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| |
| DataSplitImpl::Builder builder2( |
| BinaryRowGenerator::GenerateRow({10}, pool_.get()), |
| /*bucket=*/1, /*bucket_path=*/ |
| paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-1", |
| {meta_snapshot1_partition10_bucket1_}); |
| auto expected_data_split2 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder2.WithTotalBuckets(2) |
| .WithSnapshot(1) |
| .IsStreaming(false) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| |
| std::vector<std::shared_ptr<DataSplitImpl>> expected_data_splits = {expected_data_split1, |
| expected_data_split2}; |
| CheckResult(expected_data_splits, result_data_splits); |
| } |
| |
| TEST_F(ScanInteTest, TestScanAppendWithSnapshot1WithMultiPartitionKeys) { |
| std::string table_path = paimon::test::GetDataDir() + |
| "orc/multi_partition_append_table.db/multi_partition_append_table"; |
| |
| // set partition filter, f1 = 10, f2 = 0 |
| std::map<std::string, std::string> partition_keys; |
| partition_keys["f1"] = "10"; |
| partition_keys["f2"] = "0"; |
| |
| ScanContextBuilder context_builder(table_path); |
| context_builder.SetPartitionFilter({partition_keys}).AddOption(Options::SCAN_SNAPSHOT_ID, "1"); |
| ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); |
| ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); |
| ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); |
| // check snapshot id |
| ASSERT_EQ(result_plan->SnapshotId().value(), 1); |
| |
| // check data splits |
| auto result_data_splits = CollectDataSplits(result_plan); |
| auto meta = std::make_shared<DataFileMeta>( |
| "data-01b6a930-6564-409b-b8f4-ed1307790d72-0.orc", /*file_size=*/575, /*row_count=*/3, |
| /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), |
| /*key_stats=*/SimpleStats::EmptyStats(), |
| BinaryRowGenerator::GenerateStats({std::string("Bob"), 10, 0, 12.1}, |
| {std::string("Tony"), 10, 0, 14.1}, {0, 0, 0, 0}, |
| pool_.get()), |
| /*min_sequence_number=*/0, /*max_sequence_number=*/2, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1728497439433ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, |
| /*first_row_id=*/std::nullopt, |
| /*write_cols=*/std::nullopt); |
| DataSplitImpl::Builder builder1( |
| BinaryRowGenerator::GenerateRow({10, 0}, pool_.get()), |
| /*bucket=*/0, /*bucket_path=*/ |
| paimon::test::GetDataDir() + |
| "orc/multi_partition_append_table.db/multi_partition_append_table/f1=10/f2=0/bucket-0", |
| {meta}); |
| auto expected_data_split1 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder1.WithTotalBuckets(-1) |
| .WithSnapshot(1) |
| .IsStreaming(false) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| |
| std::vector<std::shared_ptr<DataSplitImpl>> expected_data_splits = {expected_data_split1}; |
| CheckResult(expected_data_splits, result_data_splits); |
| } |
| |
| // test complex type ts & decimal |
| TEST_F(ScanInteTest, TestScanAppendComplexDataWithSnapshot4WithPredicateFilter) { |
| std::string table_path = |
| paimon::test::GetDataDir() + "orc/append_complex_data.db/append_complex_data"; |
| // set predicate filter |
| // less than 2024 |
| auto predicate1 = |
| PredicateBuilder::LessThan(/*field_index=*/3, /*field_name=*/"f4", FieldType::TIMESTAMP, |
| Literal(paimon::Timestamp(1735344000, 0))); |
| auto predicate2 = PredicateBuilder::GreaterThan( |
| /*field_index=*/4, /*field_name=*/"f5", FieldType::DECIMAL, |
| Literal(paimon::Decimal(5, 2, 0))); |
| ASSERT_OK_AND_ASSIGN(auto predicate, PredicateBuilder::And({predicate1, predicate2})); |
| |
| ScanContextBuilder context_builder(table_path); |
| context_builder.SetPredicate(predicate).AddOption(Options::SCAN_SNAPSHOT_ID, "4"); |
| ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); |
| ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); |
| ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); |
| ASSERT_TRUE(result_plan); |
| // check snapshot id |
| ASSERT_EQ(result_plan->SnapshotId().value(), 4); |
| |
| // check data splits |
| auto result_data_splits = CollectDataSplits(result_plan); |
| auto meta = std::make_shared<DataFileMeta>( |
| "data-14a30421-7650-486c-9876-66a1fa4356ff-0.orc", /*file_size=*/1004, /*row_count=*/6, |
| /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), |
| /*key_stats=*/SimpleStats::EmptyStats(), |
| BinaryRowGenerator::GenerateStats( |
| {10, 1, 0, TimestampType(Timestamp(-2240521239999ll, 1001), 9), |
| Decimal(23, 5, DecimalUtils::StrToInt128("-12345000").value()), NullType()}, |
| {10, 1, 20006, TimestampType(Timestamp(2000000000000ll, 1001), 9), |
| Decimal(23, 5, DecimalUtils::StrToInt128("12345678998765432145678").value()), |
| NullType()}, |
| {0, 0, 1, 0, 1, 1}, pool_.get()), |
| /*min_sequence_number=*/0, /*max_sequence_number=*/5, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1767506722625ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Compact(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, |
| /*first_row_id=*/std::nullopt, |
| /*write_cols=*/std::nullopt); |
| DataSplitImpl::Builder builder1( |
| BinaryRowGenerator::GenerateRow({10}, pool_.get()), |
| /*bucket=*/0, /*bucket_path=*/ |
| paimon::test::GetDataDir() + |
| "orc/append_complex_data.db/append_complex_data/f1=10/bucket-0", |
| {meta}); |
| auto expected_data_split1 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder1.WithTotalBuckets(2) |
| .WithSnapshot(4) |
| .IsStreaming(false) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| |
| std::vector<std::shared_ptr<DataSplitImpl>> expected_data_splits = {expected_data_split1}; |
| CheckResult(expected_data_splits, result_data_splits); |
| } |
| |
| // test complex type date & binary |
| TEST_F(ScanInteTest, TestScanAppendComplexDataWithSnapshot4WithPredicateFilter2) { |
| std::string table_path = |
| paimon::test::GetDataDir() + "orc/append_complex_data.db/append_complex_data"; |
| // set predicate filter |
| auto predicate1 = PredicateBuilder::GreaterThan(/*field_index=*/2, /*field_name=*/"f3", |
| FieldType::DATE, Literal(FieldType::DATE, 0)); |
| // BINARY does not have stats in manifest, min/max in value stats is null |
| // if row_count != null_count and min/max is null, file will not be filtered |
| auto predicate2 = PredicateBuilder::GreaterThan( |
| /*field_index=*/5, /*field_name=*/"f6", FieldType::BINARY, |
| Literal(FieldType::BINARY, "zoo", 3)); |
| ASSERT_OK_AND_ASSIGN(auto predicate, PredicateBuilder::And({predicate1, predicate2})); |
| |
| ScanContextBuilder context_builder(table_path); |
| context_builder.SetPredicate(predicate).AddOption(Options::SCAN_SNAPSHOT_ID, "4"); |
| ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); |
| ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); |
| ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); |
| // check snapshot id |
| ASSERT_EQ(result_plan->SnapshotId().value(), 4); |
| |
| // check data splits |
| std::vector<std::shared_ptr<DataSplitImpl>> result_data_splits; |
| for (const auto& result : result_plan->Splits()) { |
| auto data_split = std::dynamic_pointer_cast<DataSplitImpl>(result); |
| ASSERT_TRUE(data_split); |
| result_data_splits.push_back(data_split); |
| } |
| |
| auto meta = std::make_shared<DataFileMeta>( |
| "data-14a30421-7650-486c-9876-66a1fa4356ff-0.orc", /*file_size=*/1004, /*row_count=*/6, |
| /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), |
| /*key_stats=*/SimpleStats::EmptyStats(), |
| BinaryRowGenerator::GenerateStats( |
| {10, 1, 0, TimestampType(Timestamp(-2240521239999ll, 1001), 9), |
| Decimal(23, 5, DecimalUtils::StrToInt128("-12345000").value()), NullType()}, |
| {10, 1, 20006, TimestampType(Timestamp(2000000000000ll, 1001), 9), |
| Decimal(23, 5, DecimalUtils::StrToInt128("12345678998765432145678").value()), |
| NullType()}, |
| {0, 0, 1, 0, 1, 1}, pool_.get()), |
| /*min_sequence_number=*/0, /*max_sequence_number=*/5, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1767506722625ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Compact(), |
| /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, |
| /*first_row_id=*/std::nullopt, |
| /*write_cols=*/std::nullopt); |
| DataSplitImpl::Builder builder1( |
| BinaryRowGenerator::GenerateRow({10}, pool_.get()), |
| /*bucket=*/0, /*bucket_path=*/ |
| paimon::test::GetDataDir() + |
| "orc/append_complex_data.db/append_complex_data/f1=10/bucket-0", |
| {meta}); |
| auto expected_data_split1 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder1.WithTotalBuckets(2) |
| .WithSnapshot(4) |
| .IsStreaming(false) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| |
| std::vector<std::shared_ptr<DataSplitImpl>> expected_data_splits = {expected_data_split1}; |
| CheckResult(expected_data_splits, result_data_splits); |
| } |
| |
| TEST_F(ScanInteTest, TestScanAppendWithSnapshot1WithEnableStatsDenseStore) { |
| std::string table_path = paimon::test::GetDataDir() + |
| "orc/append_10_stats_dense_store.db/append_10_stats_dense_store"; |
| ScanContextBuilder context_builder(table_path); |
| auto predicate = PredicateBuilder::GreaterThan(/*field_index=*/3, /*field_name=*/"f3", |
| FieldType::DOUBLE, Literal(13.0)); |
| context_builder.SetPredicate(predicate).AddOption(Options::SCAN_SNAPSHOT_ID, "1"); |
| ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); |
| ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); |
| ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); |
| // check data splits |
| auto result_data_splits = CollectDataSplits(result_plan); |
| auto file_meta1 = std::make_shared<DataFileMeta>( |
| "data-cdb38c8a-31c1-4824-a024-9abd3fbb466f-0.orc", /*file_size=*/543, /*row_count=*/1, |
| /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), |
| /*key_stats=*/SimpleStats::EmptyStats(), |
| BinaryRowGenerator::GenerateStats({std::string("Alice"), 10, 1}, |
| {std::string("Alice"), 10, 1}, {0, 0, 0}, pool_.get()), |
| /*min_sequence_number=*/0, /*max_sequence_number=*/0, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1731412938869ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::optional<std::vector<std::string>>({"f0", "f1", "f2"}), |
| /*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt); |
| auto file_meta2 = std::make_shared<DataFileMeta>( |
| "data-c2613568-0412-4cd9-a0c4-1eae8e4ca89b-0.orc", /*file_size=*/575, /*row_count=*/3, |
| /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), |
| /*key_stats=*/SimpleStats::EmptyStats(), |
| BinaryRowGenerator::GenerateStats({std::string("Bob"), 10, 0}, {std::string("Tony"), 10, 0}, |
| {0, 0, 0}, pool_.get()), |
| /*min_sequence_number=*/0, /*max_sequence_number=*/2, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1731412938891ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::optional<std::vector<std::string>>({"f0", "f1", "f2"}), |
| /*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt); |
| auto file_meta3 = std::make_shared<DataFileMeta>( |
| "data-a6d1261a-f798-4fbd-a251-6d6c7d8060dd-0.orc", /*file_size=*/541, /*row_count=*/1, |
| /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), |
| /*key_stats=*/SimpleStats::EmptyStats(), |
| BinaryRowGenerator::GenerateStats({std::string("Lucy"), 20, 1}, |
| {std::string("Lucy"), 20, 1}, {0, 0, 0}, pool_.get()), |
| /*min_sequence_number=*/0, /*max_sequence_number=*/0, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1731412938908ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::optional<std::vector<std::string>>({"f0", "f1", "f2"}), |
| /*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt); |
| |
| DataSplitImpl::Builder builder1( |
| BinaryRowGenerator::GenerateRow({10}, pool_.get()), |
| /*bucket=*/0, /*bucket_path=*/ |
| paimon::test::GetDataDir() + |
| "orc/append_10_stats_dense_store.db/append_10_stats_dense_store/f1=10/bucket-0", |
| {file_meta1}); |
| auto expected_data_split1 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder1.WithTotalBuckets(2) |
| .WithSnapshot(1) |
| .IsStreaming(false) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| |
| DataSplitImpl::Builder builder2( |
| BinaryRowGenerator::GenerateRow({10}, pool_.get()), |
| /*bucket=*/1, /*bucket_path=*/ |
| paimon::test::GetDataDir() + |
| "orc/append_10_stats_dense_store.db/append_10_stats_dense_store/f1=10/bucket-1", |
| {file_meta2}); |
| auto expected_data_split2 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder2.WithTotalBuckets(2) |
| .WithSnapshot(1) |
| .IsStreaming(false) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| DataSplitImpl::Builder builder3( |
| BinaryRowGenerator::GenerateRow({20}, pool_.get()), |
| /*bucket=*/0, /*bucket_path=*/ |
| paimon::test::GetDataDir() + |
| "orc/append_10_stats_dense_store.db/append_10_stats_dense_store/f1=20/bucket-0", |
| {file_meta3}); |
| auto expected_data_split3 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder3.WithTotalBuckets(2) |
| .WithSnapshot(1) |
| .IsStreaming(false) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| std::vector<std::shared_ptr<DataSplitImpl>> expected_data_splits = { |
| expected_data_split1, expected_data_split2, expected_data_split3}; |
| CheckResult(expected_data_splits, result_data_splits); |
| } |
| |
| TEST_F(ScanInteTest, TestScanAppendWithSnapshot1WithEnableStatsDenseStore2) { |
| std::string table_path = paimon::test::GetDataDir() + |
| "orc/append_10_stats_dense_store.db/append_10_stats_dense_store"; |
| ScanContextBuilder context_builder(table_path); |
| auto greater_than = PredicateBuilder::GreaterThan(/*field_index=*/3, /*field_name=*/"f3", |
| FieldType::DOUBLE, Literal(13.0)); |
| auto equal = PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f0", FieldType::STRING, |
| Literal(FieldType::STRING, "Emily", 5)); |
| auto predicate = PredicateBuilder::And({greater_than, equal}).value(); |
| |
| context_builder.SetPredicate(predicate).AddOption(Options::SCAN_SNAPSHOT_ID, "1"); |
| ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); |
| ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); |
| ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); |
| // check data splits |
| auto result_data_splits = CollectDataSplits(result_plan); |
| auto file_meta = std::make_shared<DataFileMeta>( |
| "data-c2613568-0412-4cd9-a0c4-1eae8e4ca89b-0.orc", /*file_size=*/575, /*row_count=*/3, |
| /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), |
| /*key_stats=*/SimpleStats::EmptyStats(), |
| BinaryRowGenerator::GenerateStats({std::string("Bob"), 10, 0}, {std::string("Tony"), 10, 0}, |
| {0, 0, 0}, pool_.get()), |
| /*min_sequence_number=*/0, /*max_sequence_number=*/2, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1731412938891ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::optional<std::vector<std::string>>({"f0", "f1", "f2"}), |
| /*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt); |
| |
| DataSplitImpl::Builder builder( |
| BinaryRowGenerator::GenerateRow({10}, pool_.get()), |
| /*bucket=*/1, /*bucket_path=*/ |
| paimon::test::GetDataDir() + |
| "orc/append_10_stats_dense_store.db/append_10_stats_dense_store/f1=10/bucket-1", |
| {file_meta}); |
| auto expected_data_split = std::dynamic_pointer_cast<DataSplitImpl>(builder.WithTotalBuckets(2) |
| .WithSnapshot(1) |
| .IsStreaming(false) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| std::vector<std::shared_ptr<DataSplitImpl>> expected_data_splits = {expected_data_split}; |
| CheckResult(expected_data_splits, result_data_splits); |
| } |
| |
| TEST_F(ScanInteTest, TestScanPKWithSnapshot1WithBucketStats) { |
| std::string table_path = paimon::test::GetDataDir() + |
| "orc/pk_table_with_total_buckets.db/pk_table_with_total_buckets"; |
| ScanContextBuilder context_builder(table_path); |
| context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "1").SetBucketFilter(2); |
| ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); |
| ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); |
| ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); |
| ASSERT_EQ(result_plan->SnapshotId().value(), 1); |
| ASSERT_TRUE(result_plan->Splits().empty()); |
| } |
| |
| TEST_F(ScanInteTest, TestScanPKWithInvalidOptions) { |
| std::string table_path = paimon::test::GetDataDir() + |
| "orc/pk_table_with_total_buckets.db/pk_table_with_total_buckets"; |
| ScanContextBuilder context_builder(table_path); |
| context_builder.AddOption(Options::BUCKET, "-1").WithStreamingMode(true); |
| ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); |
| ASSERT_NOK_WITH_MSG(TableScan::Create(std::move(scan_context)), |
| "do not support pk table bucket=-1 in scan process"); |
| } |
| |
| TEST_F(ScanInteTest, TestReadWithNoSnapshot) { |
| std::string table_path = paimon::test::GetDataDir() + |
| "orc/append_table_with_nested_type.db/append_table_with_nested_type"; |
| ScanContextBuilder context_builder(table_path); |
| ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); |
| ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); |
| ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); |
| ASSERT_FALSE(result_plan->SnapshotId()); |
| ASSERT_TRUE(result_plan->Splits().empty()); |
| } |
| |
| TEST_F(ScanInteTest, TestScanAppendWithAlterTableWithCast) { |
| std::string table_path = |
| paimon::test::GetDataDir() + |
| "orc/append_table_alter_table_with_cast.db/append_table_alter_table_with_cast"; |
| ScanContextBuilder context_builder(table_path); |
| |
| auto child1 = |
| PredicateBuilder::Or( |
| {PredicateBuilder::IsNotNull(/*field_index=*/0, /*field_name=*/"f4", |
| FieldType::TIMESTAMP), |
| PredicateBuilder::IsNotNull(/*field_index=*/1, /*field_name=*/"key0", FieldType::INT), |
| PredicateBuilder::IsNotNull(/*field_index=*/2, /*field_name=*/"key1", FieldType::INT)}) |
| .value(); |
| |
| auto sub_child1 = |
| PredicateBuilder::IsNotNull(/*field_index=*/3, /*field_name=*/"f3", FieldType::INT); |
| auto sub_child2 = |
| PredicateBuilder::IsNotNull(/*field_index=*/4, /*field_name=*/"f1", FieldType::STRING); |
| auto sub_child3 = |
| PredicateBuilder::IsNotNull(/*field_index=*/5, /*field_name=*/"f2", FieldType::DECIMAL); |
| auto child2 = PredicateBuilder::And({sub_child1, sub_child2, sub_child3}).value(); |
| |
| auto child3 = PredicateBuilder::GreaterThan(/*field_index=*/7, /*field_name=*/"f6", |
| FieldType::INT, Literal(80)); |
| ASSERT_OK_AND_ASSIGN(auto predicate, PredicateBuilder::And({child1, child2, child3})); |
| |
| context_builder.SetPredicate(predicate).AddOption(Options::SCAN_SNAPSHOT_ID, "2"); |
| ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); |
| ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); |
| ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); |
| // check data splits |
| auto result_data_splits = CollectDataSplits(result_plan); |
| auto file_meta = std::make_shared<DataFileMeta>( |
| "data-81a1c016-765b-48c9-b209-0d8e95bf8a00-0.orc", /*file_size=*/1070, /*row_count=*/2, |
| /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), |
| /*key_stats=*/SimpleStats::EmptyStats(), |
| BinaryRowGenerator::GenerateStats( |
| {TimestampType(Timestamp(1732603136084l, 84), 9), 1, 1, 180, |
| std::string("2024-11-26 15:29"), Decimal(6, 3, -999420), false, -86}, |
| {TimestampType(Timestamp(1732603136094l, 94), 9), 1, 1, 190, std::string("I'm strange"), |
| Decimal(6, 3, 8032), true, 96}, |
| {0, 0, 0, 0, 0, 0, 0, 0}, pool_.get()), |
| /*min_sequence_number=*/0, /*max_sequence_number=*/1, /*schema_id=*/1, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1732635461460ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, |
| /*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt); |
| |
| DataSplitImpl::Builder builder(BinaryRowGenerator::GenerateRow({1, 1}, pool_.get()), |
| /*bucket=*/0, /*bucket_path=*/ |
| paimon::test::GetDataDir() + |
| "orc/append_table_alter_table_with_cast.db/" |
| "append_table_alter_table_with_cast/key0=1/key1=1/bucket-0", |
| {file_meta}); |
| auto expected_data_split = std::dynamic_pointer_cast<DataSplitImpl>(builder.WithTotalBuckets(-1) |
| .WithSnapshot(2) |
| .IsStreaming(false) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| std::vector<std::shared_ptr<DataSplitImpl>> expected_data_splits = {expected_data_split}; |
| CheckResult(expected_data_splits, result_data_splits); |
| } |
| |
| TEST_F(ScanInteTest, TestScanAppendWithAlterTableWithNoCast) { |
| std::string table_path = paimon::test::GetDataDir() + |
| "orc/append_table_with_alter_table.db/append_table_with_alter_table"; |
| ScanContextBuilder context_builder(table_path); |
| |
| auto child1 = PredicateBuilder::GreaterThan(/*field_index=*/2, /*field_name=*/"k", |
| FieldType::INT, Literal(36)); |
| auto child2 = PredicateBuilder::LessThan(/*field_index=*/2, /*field_name=*/"k", FieldType::INT, |
| Literal(96)); |
| ASSERT_OK_AND_ASSIGN(auto predicate, PredicateBuilder::And({child1, child2})); |
| |
| context_builder.SetPredicate(predicate).AddOption(Options::SCAN_SNAPSHOT_ID, "2"); |
| ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); |
| ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); |
| ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); |
| // check data splits |
| auto result_data_splits = CollectDataSplits(result_plan); |
| auto file_meta1 = std::make_shared<DataFileMeta>( |
| "data-492ed5ab-4740-4e93-8a0a-79a6893b1770-0.orc", /*file_size=*/603, /*row_count=*/2, |
| /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), |
| /*key_stats=*/SimpleStats::EmptyStats(), |
| BinaryRowGenerator::GenerateStats({1, 1, 42, 43, 44, 45, 46}, {1, 1, 52, 53, 54, 55, 56}, |
| {0, 0, 0, 0, 0, 0, 0}, pool_.get()), |
| /*min_sequence_number=*/0, /*max_sequence_number=*/1, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1730458825047ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, |
| /*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt); |
| |
| DataSplitImpl::Builder builder1(BinaryRowGenerator::GenerateRow({1, 1}, pool_.get()), |
| /*bucket=*/0, /*bucket_path=*/ |
| paimon::test::GetDataDir() + |
| "orc/append_table_with_alter_table.db/" |
| "append_table_with_alter_table/key0=1/key1=1/bucket-0", |
| {file_meta1}); |
| auto expected_data_split1 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder1.WithTotalBuckets(-1) |
| .WithSnapshot(2) |
| .IsStreaming(false) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| auto file_meta2 = std::make_shared<DataFileMeta>( |
| "data-b34cd128-03e3-4e70-ba9c-5dec2183849c-0.orc", /*file_size=*/680, /*row_count=*/3, |
| /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), |
| /*key_stats=*/SimpleStats::EmptyStats(), |
| BinaryRowGenerator::GenerateStats({0, 1, 66, 63, 517, 65, 618}, |
| {0, 1, 86, 83, 537, 85, 638}, {0, 0, 0, 0, 0, 0, 0}, |
| pool_.get()), |
| /*min_sequence_number=*/0, /*max_sequence_number=*/2, /*schema_id=*/1, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1730459969493ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, |
| /*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt); |
| |
| DataSplitImpl::Builder builder2(BinaryRowGenerator::GenerateRow({0, 1}, pool_.get()), |
| /*bucket=*/0, /*bucket_path=*/ |
| paimon::test::GetDataDir() + |
| "orc/append_table_with_alter_table.db/" |
| "append_table_with_alter_table/key0=0/key1=1/bucket-0", |
| {file_meta2}); |
| auto expected_data_split2 = |
| std::dynamic_pointer_cast<DataSplitImpl>(builder2.WithTotalBuckets(-1) |
| .WithSnapshot(2) |
| .IsStreaming(false) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| std::vector<std::shared_ptr<DataSplitImpl>> expected_data_splits = {expected_data_split1, |
| expected_data_split2}; |
| CheckResult(expected_data_splits, result_data_splits); |
| } |
| |
| TEST_F(ScanInteTest, TestScanAppendWithAlterTableWithDenseField) { |
| std::string table_path = paimon::test::GetDataDir() + |
| "orc/append_table_with_alter_table_with_dense_field.db/" |
| "append_table_with_alter_table_with_dense_field"; |
| ScanContextBuilder context_builder(table_path); |
| |
| auto child1 = PredicateBuilder::GreaterThan(/*field_index=*/2, /*field_name=*/"f1", |
| FieldType::INT, Literal(22)); |
| auto child3 = PredicateBuilder::LessThan(/*field_index=*/3, /*field_name=*/"f3", |
| FieldType::DOUBLE, Literal(0.0)); |
| ASSERT_OK_AND_ASSIGN(auto predicate, PredicateBuilder::And({child1, child3})); |
| |
| context_builder.SetPredicate(predicate).AddOption(Options::SCAN_SNAPSHOT_ID, "2"); |
| ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); |
| ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); |
| ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); |
| // check data splits |
| auto result_data_splits = CollectDataSplits(result_plan); |
| auto file_meta = std::make_shared<DataFileMeta>( |
| "data-e925c7db-58e3-45e3-b21e-b1a7774a5caf-0.orc", /*file_size=*/682, /*row_count=*/2, |
| /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), |
| /*key_stats=*/SimpleStats::EmptyStats(), |
| BinaryRowGenerator::GenerateStats({1, std::string("Cathy"), 13, 23}, |
| {1, std::string("David"), 14, 24}, {0, 0, 0, 0}, |
| pool_.get()), |
| /*min_sequence_number=*/0, /*max_sequence_number=*/1, /*schema_id=*/0, |
| /*level=*/0, /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1751647880163ll, 0), |
| /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::optional<std::vector<std::string>>({"key0", "f0", "f1", "f2"}), |
| /*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt); |
| DataSplitImpl::Builder builder( |
| BinaryRowGenerator::GenerateRow({1}, pool_.get()), |
| /*bucket=*/0, /*bucket_path=*/ |
| paimon::test::GetDataDir() + |
| "orc/append_table_with_alter_table_with_dense_field.db/" |
| "append_table_with_alter_table_with_dense_field/key0=1/bucket-0", |
| {file_meta}); |
| auto expected_data_split = std::dynamic_pointer_cast<DataSplitImpl>(builder.WithTotalBuckets(-1) |
| .WithSnapshot(2) |
| .IsStreaming(false) |
| .RawConvertible(true) |
| .Build() |
| .value()); |
| std::vector<std::shared_ptr<DataSplitImpl>> expected_data_splits = {expected_data_split}; |
| CheckResult(expected_data_splits, result_data_splits); |
| } |
| |
| TEST_F(ScanInteTest, TestScanAppendWithBitmapEmbeddedIndex) { |
| std::string table_path = |
| paimon::test::GetDataDir() + "orc/append_with_bitmap.db/append_with_bitmap/"; |
| ScanContextBuilder context_builder(table_path); |
| |
| auto child1 = PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f0", FieldType::STRING, |
| Literal(FieldType::STRING, "Tony", 4)); |
| auto child2 = PredicateBuilder::Equal(/*field_index=*/1, /*field_name=*/"f1", FieldType::INT, |
| Literal(10)); |
| ASSERT_OK_AND_ASSIGN(auto predicate, PredicateBuilder::And({child1, child2})); |
| |
| context_builder.SetPredicate(predicate).AddOption(Options::SCAN_SNAPSHOT_ID, "1"); |
| ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); |
| ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); |
| ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); |
| // check data splits |
| auto result_data_splits = CollectDataSplits(result_plan); |
| std::vector<uint8_t> embedded_bytes = { |
| 0, 5, 78, 78, 208, 26, 53, 174, 0, 0, 0, 1, 0, 0, 0, 96, 0, 0, |
| 0, 3, 0, 2, 102, 48, 0, 0, 0, 1, 0, 6, 98, 105, 116, 109, 97, 112, |
| 0, 0, 0, 96, 0, 0, 0, 176, 0, 2, 102, 49, 0, 0, 0, 1, 0, 6, |
| 98, 105, 116, 109, 97, 112, 0, 0, 1, 16, 0, 0, 0, 102, 0, 2, 102, 50, |
| 0, 0, 0, 1, 0, 6, 98, 105, 116, 109, 97, 112, 0, 0, 1, 118, 0, 0, |
| 0, 108, 0, 0, 0, 0, 2, 0, 0, 0, 8, 0, 0, 0, 5, 0, 0, 0, |
| 0, 1, 0, 0, 0, 5, 65, 108, 105, 99, 101, 0, 0, 0, 0, 0, 0, 0, |
| 85, 0, 0, 0, 5, 0, 0, 0, 5, 65, 108, 105, 99, 101, 0, 0, 0, 0, |
| 0, 0, 0, 20, 0, 0, 0, 3, 66, 111, 98, 0, 0, 0, 20, 0, 0, 0, |
| 20, 0, 0, 0, 5, 69, 109, 105, 108, 121, 255, 255, 255, 253, 255, 255, 255, 255, |
| 0, 0, 0, 4, 76, 117, 99, 121, 255, 255, 255, 251, 255, 255, 255, 255, 0, 0, |
| 0, 4, 84, 111, 110, 121, 0, 0, 0, 40, 0, 0, 0, 20, 58, 48, 0, 0, |
| 1, 0, 0, 0, 0, 0, 1, 0, 16, 0, 0, 0, 0, 0, 7, 0, 58, 48, |
| 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 16, 0, 0, 0, 1, 0, 5, 0, |
| 58, 48, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 16, 0, 0, 0, 3, 0, |
| 6, 0, 2, 0, 0, 0, 8, 0, 0, 0, 2, 0, 0, 0, 0, 1, 0, 0, |
| 0, 10, 0, 0, 0, 0, 0, 0, 0, 28, 0, 0, 0, 2, 0, 0, 0, 10, |
| 0, 0, 0, 22, 0, 0, 0, 26, 0, 0, 0, 20, 0, 0, 0, 0, 0, 0, |
| 0, 22, 58, 48, 0, 0, 1, 0, 0, 0, 0, 0, 2, 0, 16, 0, 0, 0, |
| 4, 0, 6, 0, 7, 0, 58, 48, 0, 0, 1, 0, 0, 0, 0, 0, 4, 0, |
| 16, 0, 0, 0, 0, 0, 1, 0, 2, 0, 3, 0, 5, 0, 2, 0, 0, 0, |
| 8, 0, 0, 0, 2, 1, 255, 255, 255, 248, 0, 0, 0, 18, 0, 0, 0, 1, |
| 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 28, 0, 0, 0, 2, 0, 0, |
| 0, 0, 0, 0, 0, 0, 0, 0, 0, 22, 0, 0, 0, 1, 0, 0, 0, 22, |
| 0, 0, 0, 24, 58, 48, 0, 0, 1, 0, 0, 0, 0, 0, 2, 0, 16, 0, |
| 0, 0, 2, 0, 3, 0, 6, 0, 58, 48, 0, 0, 1, 0, 0, 0, 0, 0, |
| 3, 0, 16, 0, 0, 0, 0, 0, 1, 0, 4, 0, 5, 0}; |
| auto embedded_index = std::make_shared<Bytes>(embedded_bytes.size(), pool_.get()); |
| memcpy(embedded_index->data(), reinterpret_cast<const void*>(embedded_bytes.data()), |
| embedded_bytes.size()); |
| |
| auto file_meta = std::make_shared<DataFileMeta>( |
| "data-62feb610-c83f-4217-9b50-bbad9cd08eb4-0.orc", /*file_size=*/689, |
| /*row_count=*/8, /*min_key=*/BinaryRow::EmptyRow(), |
| /*max_key=*/BinaryRow::EmptyRow(), /*key_stats=*/SimpleStats::EmptyStats(), |
| /*value_stats=*/ |
| BinaryRowGenerator::GenerateStats({std::string("Alice"), 10, 0, 11.1}, |
| {std::string("Tony"), 20, 1, 18.1}, {0, 0, 1, 0}, |
| pool_.get()), |
| /*min_sequence_number=*/0, |
| /*max_sequence_number=*/7, /*schema_id=*/0, |
| /*level=*/0, |
| /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1745000702835ll, 0), /*delete_row_count=*/0, |
| /*embedded_index=*/embedded_index, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, |
| /*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt); |
| DataSplitImpl::Builder builder(BinaryRow::EmptyRow(), /*bucket=*/0, |
| /*bucket_path=*/table_path + "bucket-0", {file_meta}); |
| ASSERT_OK_AND_ASSIGN(auto expected_data_split, builder.WithTotalBuckets(-1) |
| .WithSnapshot(1) |
| .IsStreaming(false) |
| .RawConvertible(true) |
| .Build()); |
| std::vector<std::shared_ptr<DataSplitImpl>> expected_data_splits = {expected_data_split}; |
| CheckResult(expected_data_splits, result_data_splits); |
| } |
| |
| TEST_F(ScanInteTest, TestScanAppendWithBitmapEmbeddedIndexWithEmptyResult) { |
| std::string table_path = |
| paimon::test::GetDataDir() + "orc/append_with_bitmap.db/append_with_bitmap/"; |
| ScanContextBuilder context_builder(table_path); |
| |
| auto child1 = PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f0", FieldType::STRING, |
| Literal(FieldType::STRING, "Lucy", 4)); |
| auto child2 = PredicateBuilder::Equal(/*field_index=*/1, /*field_name=*/"f1", FieldType::INT, |
| Literal(10)); |
| ASSERT_OK_AND_ASSIGN(auto predicate, PredicateBuilder::And({child1, child2})); |
| |
| context_builder.SetPredicate(predicate).AddOption(Options::SCAN_SNAPSHOT_ID, "1"); |
| ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); |
| ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); |
| ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); |
| ASSERT_TRUE(result_plan->Splits().empty()); |
| } |
| |
| TEST_F(ScanInteTest, TestScanAppendWithBitmapNoEmbeddedIndex) { |
| std::string table_path = |
| paimon::test::GetDataDir() + |
| "orc/append_with_bitmap_no_embedding.db/append_with_bitmap_no_embedding/"; |
| ScanContextBuilder context_builder(table_path); |
| |
| auto child1 = PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f0", FieldType::STRING, |
| Literal(FieldType::STRING, "Lucy", 4)); |
| auto child2 = PredicateBuilder::Equal(/*field_index=*/1, /*field_name=*/"f1", FieldType::INT, |
| Literal(10)); |
| ASSERT_OK_AND_ASSIGN(auto predicate, PredicateBuilder::And({child1, child2})); |
| |
| context_builder.SetPredicate(predicate).AddOption(Options::SCAN_SNAPSHOT_ID, "1"); |
| ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); |
| ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); |
| ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); |
| // check data splits |
| auto result_data_splits = CollectDataSplits(result_plan); |
| auto file_meta = std::make_shared<DataFileMeta>( |
| "data-414509f5-e40c-4245-b992-bbf486778ac9-0.orc", /*file_size=*/689, |
| /*row_count=*/8, /*min_key=*/BinaryRow::EmptyRow(), |
| /*max_key=*/BinaryRow::EmptyRow(), /*key_stats=*/SimpleStats::EmptyStats(), |
| /*value_stats=*/ |
| BinaryRowGenerator::GenerateStats({std::string("Alice"), 10, 0, 11.1}, |
| {std::string("Tony"), 20, 1, 18.1}, {0, 0, 1, 0}, |
| pool_.get()), |
| /*min_sequence_number=*/0, |
| /*max_sequence_number=*/7, /*schema_id=*/0, |
| /*level=*/0, |
| /*extra_files=*/ |
| std::vector<std::optional<std::string>>( |
| {"data-414509f5-e40c-4245-b992-bbf486778ac9-0.orc.index"}), |
| /*creation_time=*/Timestamp(1745235371029ll, 0), /*delete_row_count=*/0, |
| /*embedded_index=*/nullptr, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, |
| /*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt); |
| DataSplitImpl::Builder builder(BinaryRow::EmptyRow(), /*bucket=*/0, |
| /*bucket_path=*/table_path + "bucket-0", {file_meta}); |
| ASSERT_OK_AND_ASSIGN(auto expected_data_split, builder.WithTotalBuckets(-1) |
| .WithSnapshot(1) |
| .IsStreaming(false) |
| .RawConvertible(true) |
| .Build()); |
| std::vector<std::shared_ptr<DataSplitImpl>> expected_data_splits = {expected_data_split}; |
| CheckResult(expected_data_splits, result_data_splits); |
| } |
| |
| TEST_F(ScanInteTest, TestScanAppendWithBitmapAndAlterTable) { |
| std::string table_path = |
| paimon::test::GetDataDir() + |
| "orc/append_with_bitmap_alter_table.db/append_with_bitmap_alter_table/"; |
| ScanContextBuilder context_builder(table_path); |
| // file0 will be removed as f5 not exists |
| auto predicate = PredicateBuilder::GreaterThan(/*field_index=*/3, /*field_name=*/"f5", |
| FieldType::INT, Literal(100)); |
| |
| context_builder.SetPredicate(predicate).AddOption(Options::SCAN_SNAPSHOT_ID, "2"); |
| ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); |
| ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); |
| ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); |
| |
| // check data splits |
| auto result_data_splits = CollectDataSplits(result_plan); |
| std::vector<uint8_t> embedded_bytes = { |
| 0, 5, 78, 78, 208, 26, 53, 174, 0, 0, 0, 1, 0, 0, 0, 96, 0, 0, |
| 0, 3, 0, 2, 102, 49, 0, 0, 0, 1, 0, 6, 98, 105, 116, 109, 97, 112, |
| 0, 0, 0, 96, 0, 0, 0, 102, 0, 2, 102, 52, 0, 0, 0, 1, 0, 6, |
| 98, 105, 116, 109, 97, 112, 0, 0, 0, 198, 0, 0, 0, 101, 0, 2, 102, 53, |
| 0, 0, 0, 1, 0, 6, 98, 105, 116, 109, 97, 112, 0, 0, 1, 43, 0, 0, |
| 0, 82, 0, 0, 0, 0, 2, 0, 0, 0, 4, 0, 0, 0, 3, 0, 0, 0, |
| 0, 1, 0, 0, 0, 0, 0, 0, 0, 10, 0, 0, 0, 0, 0, 0, 0, 52, |
| 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 10, 255, 255, 255, 253, 255, 255, |
| 255, 255, 0, 0, 0, 0, 0, 0, 0, 20, 255, 255, 255, 254, 255, 255, 255, 255, |
| 0, 0, 0, 0, 0, 0, 0, 30, 0, 0, 0, 0, 0, 0, 0, 20, 58, 48, |
| 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 16, 0, 0, 0, 0, 0, 3, 0, |
| 2, 0, 0, 0, 4, 0, 0, 0, 4, 0, 0, 0, 0, 1, 0, 0, 0, 5, |
| 65, 108, 105, 99, 101, 0, 0, 0, 0, 0, 0, 0, 70, 0, 0, 0, 4, 0, |
| 0, 0, 5, 65, 108, 105, 99, 101, 255, 255, 255, 255, 255, 255, 255, 255, 0, 0, |
| 0, 3, 66, 111, 98, 255, 255, 255, 253, 255, 255, 255, 255, 0, 0, 0, 5, 68, |
| 97, 118, 105, 100, 255, 255, 255, 252, 255, 255, 255, 255, 0, 0, 0, 5, 69, 109, |
| 105, 108, 121, 255, 255, 255, 254, 255, 255, 255, 255, 2, 0, 0, 0, 4, 0, 0, |
| 0, 2, 1, 255, 255, 255, 252, 0, 0, 0, 18, 0, 0, 0, 1, 0, 0, 0, |
| 100, 0, 0, 0, 0, 0, 0, 0, 28, 0, 0, 0, 2, 0, 0, 0, 100, 0, |
| 0, 0, 0, 0, 0, 0, 20, 0, 0, 0, 101, 255, 255, 255, 254, 255, 255, 255, |
| 255, 58, 48, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 16, 0, 0, 0, 0, |
| 0, 2, 0}; |
| auto embedded_index = std::make_shared<Bytes>(embedded_bytes.size(), pool_.get()); |
| memcpy(embedded_index->data(), reinterpret_cast<const void*>(embedded_bytes.data()), |
| embedded_bytes.size()); |
| auto file_meta = std::make_shared<DataFileMeta>( |
| "data-a29b7235-760d-4838-881c-39cbef585dd2-0.orc", /*file_size=*/666, |
| /*row_count=*/4, /*min_key=*/BinaryRow::EmptyRow(), |
| /*max_key=*/BinaryRow::EmptyRow(), /*key_stats=*/SimpleStats::EmptyStats(), |
| /*value_stats=*/ |
| BinaryRowGenerator::GenerateStats({10, std::string("Alice"), 21.1, 100}, |
| {30, std::string("Emily"), 24.1, 101}, {0, 0, 0, 1}, |
| pool_.get()), |
| /*min_sequence_number=*/0, /*max_sequence_number=*/3, /*schema_id=*/1, /*level=*/0, |
| /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1745253323731ll, 0), /*delete_row_count=*/0, |
| /*embedded_index=*/embedded_index, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, |
| /*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt); |
| |
| DataSplitImpl::Builder builder(BinaryRow::EmptyRow(), /*bucket=*/0, |
| /*bucket_path=*/table_path + "bucket-0", {file_meta}); |
| ASSERT_OK_AND_ASSIGN(auto expected_data_split, builder.WithTotalBuckets(-1) |
| .WithSnapshot(2) |
| .IsStreaming(false) |
| .RawConvertible(true) |
| .Build()); |
| std::vector<std::shared_ptr<DataSplitImpl>> expected_data_splits = {expected_data_split}; |
| CheckResult(expected_data_splits, result_data_splits); |
| } |
| |
| TEST_F(ScanInteTest, TestScanAppendWithBitmapAndAlterTable3) { |
| std::string table_path = |
| paimon::test::GetDataDir() + |
| "orc/append_with_bitmap_alter_table.db/append_with_bitmap_alter_table/"; |
| ScanContextBuilder context_builder(table_path); |
| auto child1 = PredicateBuilder::IsNull(/*field_index=*/3, /*field_name=*/"f5", FieldType::INT); |
| auto child2 = PredicateBuilder::LessThan(/*field_index=*/2, /*field_name=*/"f3", |
| FieldType::DOUBLE, Literal(20.0)); |
| ASSERT_OK_AND_ASSIGN(auto predicate, PredicateBuilder::And({child1, child2})); |
| |
| context_builder.SetPredicate(predicate).AddOption(Options::SCAN_SNAPSHOT_ID, "2"); |
| ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); |
| ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); |
| ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); |
| |
| // check data splits |
| auto result_data_splits = CollectDataSplits(result_plan); |
| std::vector<uint8_t> embedded_bytes = { |
| 0, 5, 78, 78, 208, 26, 53, 174, 0, 0, 0, 1, 0, 0, 0, 96, 0, 0, |
| 0, 3, 0, 2, 102, 48, 0, 0, 0, 1, 0, 6, 98, 105, 116, 109, 97, 112, |
| 0, 0, 0, 96, 0, 0, 0, 176, 0, 2, 102, 49, 0, 0, 0, 1, 0, 6, |
| 98, 105, 116, 109, 97, 112, 0, 0, 1, 16, 0, 0, 0, 102, 0, 2, 102, 50, |
| 0, 0, 0, 1, 0, 6, 98, 105, 116, 109, 97, 112, 0, 0, 1, 118, 0, 0, |
| 0, 108, 0, 0, 0, 0, 2, 0, 0, 0, 8, 0, 0, 0, 5, 0, 0, 0, |
| 0, 1, 0, 0, 0, 5, 65, 108, 105, 99, 101, 0, 0, 0, 0, 0, 0, 0, |
| 85, 0, 0, 0, 5, 0, 0, 0, 5, 65, 108, 105, 99, 101, 0, 0, 0, 0, |
| 0, 0, 0, 20, 0, 0, 0, 3, 66, 111, 98, 0, 0, 0, 20, 0, 0, 0, |
| 20, 0, 0, 0, 5, 69, 109, 105, 108, 121, 255, 255, 255, 253, 255, 255, 255, 255, |
| 0, 0, 0, 4, 76, 117, 99, 121, 255, 255, 255, 251, 255, 255, 255, 255, 0, 0, |
| 0, 4, 84, 111, 110, 121, 0, 0, 0, 40, 0, 0, 0, 20, 58, 48, 0, 0, |
| 1, 0, 0, 0, 0, 0, 1, 0, 16, 0, 0, 0, 0, 0, 7, 0, 58, 48, |
| 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 16, 0, 0, 0, 1, 0, 5, 0, |
| 58, 48, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 16, 0, 0, 0, 3, 0, |
| 6, 0, 2, 0, 0, 0, 8, 0, 0, 0, 2, 0, 0, 0, 0, 1, 0, 0, |
| 0, 10, 0, 0, 0, 0, 0, 0, 0, 28, 0, 0, 0, 2, 0, 0, 0, 10, |
| 0, 0, 0, 22, 0, 0, 0, 26, 0, 0, 0, 20, 0, 0, 0, 0, 0, 0, |
| 0, 22, 58, 48, 0, 0, 1, 0, 0, 0, 0, 0, 2, 0, 16, 0, 0, 0, |
| 4, 0, 6, 0, 7, 0, 58, 48, 0, 0, 1, 0, 0, 0, 0, 0, 4, 0, |
| 16, 0, 0, 0, 0, 0, 1, 0, 2, 0, 3, 0, 5, 0, 2, 0, 0, 0, |
| 8, 0, 0, 0, 2, 1, 255, 255, 255, 248, 0, 0, 0, 18, 0, 0, 0, 1, |
| 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 28, 0, 0, 0, 2, 0, 0, |
| 0, 0, 0, 0, 0, 0, 0, 0, 0, 22, 0, 0, 0, 1, 0, 0, 0, 22, |
| 0, 0, 0, 24, 58, 48, 0, 0, 1, 0, 0, 0, 0, 0, 2, 0, 16, 0, |
| 0, 0, 2, 0, 3, 0, 6, 0, 58, 48, 0, 0, 1, 0, 0, 0, 0, 0, |
| 3, 0, 16, 0, 0, 0, 0, 0, 1, 0, 4, 0, 5, 0}; |
| auto embedded_index = std::make_shared<Bytes>(embedded_bytes.size(), pool_.get()); |
| memcpy(embedded_index->data(), reinterpret_cast<const void*>(embedded_bytes.data()), |
| embedded_bytes.size()); |
| auto file_meta = std::make_shared<DataFileMeta>( |
| "data-68014988-5451-478f-a18a-a1668214cf3d-0.orc", /*file_size=*/689, |
| /*row_count=*/8, /*min_key=*/BinaryRow::EmptyRow(), |
| /*max_key=*/BinaryRow::EmptyRow(), /*key_stats=*/SimpleStats::EmptyStats(), |
| /*value_stats=*/ |
| BinaryRowGenerator::GenerateStats({std::string("Alice"), 10, 0, 11.1}, |
| {std::string("Tony"), 20, 1, 18.1}, {0, 0, 1, 0}, |
| pool_.get()), |
| /*min_sequence_number=*/0, /*max_sequence_number=*/7, /*schema_id=*/0, /*level=*/0, |
| /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1745251357742ll, 0), /*delete_row_count=*/0, |
| /*embedded_index=*/embedded_index, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, |
| /*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt); |
| |
| DataSplitImpl::Builder builder(BinaryRow::EmptyRow(), /*bucket=*/0, |
| /*bucket_path=*/table_path + "bucket-0", {file_meta}); |
| ASSERT_OK_AND_ASSIGN(auto expected_data_split, builder.WithTotalBuckets(-1) |
| .WithSnapshot(2) |
| .IsStreaming(false) |
| .RawConvertible(true) |
| .Build()); |
| std::vector<std::shared_ptr<DataSplitImpl>> expected_data_splits = {expected_data_split}; |
| CheckResult(expected_data_splits, result_data_splits); |
| } |
| |
| TEST_F(ScanInteTest, TestScanAppendWithBitmapAndAlterTable2) { |
| std::string table_path = |
| paimon::test::GetDataDir() + |
| "orc/append_with_bitmap_alter_table.db/append_with_bitmap_alter_table/"; |
| ScanContextBuilder context_builder(table_path); |
| |
| // in stats filter: predicate is trimmed as type for f1 is not consist: int->bigint |
| // in index filter: predicate is removed as type for f1 is converted bigint->int, which is not |
| // safe |
| auto predicate = PredicateBuilder::GreaterThan(/*field_index=*/0, /*field_name=*/"f1", |
| FieldType::BIGINT, Literal(100l)); |
| |
| context_builder.SetPredicate(predicate).AddOption(Options::SCAN_SNAPSHOT_ID, "2"); |
| ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); |
| ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); |
| ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); |
| |
| // check data splits |
| auto result_data_splits = CollectDataSplits(result_plan); |
| std::vector<uint8_t> embedded_bytes = { |
| 0, 5, 78, 78, 208, 26, 53, 174, 0, 0, 0, 1, 0, 0, 0, 96, 0, 0, |
| 0, 3, 0, 2, 102, 48, 0, 0, 0, 1, 0, 6, 98, 105, 116, 109, 97, 112, |
| 0, 0, 0, 96, 0, 0, 0, 176, 0, 2, 102, 49, 0, 0, 0, 1, 0, 6, |
| 98, 105, 116, 109, 97, 112, 0, 0, 1, 16, 0, 0, 0, 102, 0, 2, 102, 50, |
| 0, 0, 0, 1, 0, 6, 98, 105, 116, 109, 97, 112, 0, 0, 1, 118, 0, 0, |
| 0, 108, 0, 0, 0, 0, 2, 0, 0, 0, 8, 0, 0, 0, 5, 0, 0, 0, |
| 0, 1, 0, 0, 0, 5, 65, 108, 105, 99, 101, 0, 0, 0, 0, 0, 0, 0, |
| 85, 0, 0, 0, 5, 0, 0, 0, 5, 65, 108, 105, 99, 101, 0, 0, 0, 0, |
| 0, 0, 0, 20, 0, 0, 0, 3, 66, 111, 98, 0, 0, 0, 20, 0, 0, 0, |
| 20, 0, 0, 0, 5, 69, 109, 105, 108, 121, 255, 255, 255, 253, 255, 255, 255, 255, |
| 0, 0, 0, 4, 76, 117, 99, 121, 255, 255, 255, 251, 255, 255, 255, 255, 0, 0, |
| 0, 4, 84, 111, 110, 121, 0, 0, 0, 40, 0, 0, 0, 20, 58, 48, 0, 0, |
| 1, 0, 0, 0, 0, 0, 1, 0, 16, 0, 0, 0, 0, 0, 7, 0, 58, 48, |
| 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 16, 0, 0, 0, 1, 0, 5, 0, |
| 58, 48, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 16, 0, 0, 0, 3, 0, |
| 6, 0, 2, 0, 0, 0, 8, 0, 0, 0, 2, 0, 0, 0, 0, 1, 0, 0, |
| 0, 10, 0, 0, 0, 0, 0, 0, 0, 28, 0, 0, 0, 2, 0, 0, 0, 10, |
| 0, 0, 0, 22, 0, 0, 0, 26, 0, 0, 0, 20, 0, 0, 0, 0, 0, 0, |
| 0, 22, 58, 48, 0, 0, 1, 0, 0, 0, 0, 0, 2, 0, 16, 0, 0, 0, |
| 4, 0, 6, 0, 7, 0, 58, 48, 0, 0, 1, 0, 0, 0, 0, 0, 4, 0, |
| 16, 0, 0, 0, 0, 0, 1, 0, 2, 0, 3, 0, 5, 0, 2, 0, 0, 0, |
| 8, 0, 0, 0, 2, 1, 255, 255, 255, 248, 0, 0, 0, 18, 0, 0, 0, 1, |
| 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 28, 0, 0, 0, 2, 0, 0, |
| 0, 0, 0, 0, 0, 0, 0, 0, 0, 22, 0, 0, 0, 1, 0, 0, 0, 22, |
| 0, 0, 0, 24, 58, 48, 0, 0, 1, 0, 0, 0, 0, 0, 2, 0, 16, 0, |
| 0, 0, 2, 0, 3, 0, 6, 0, 58, 48, 0, 0, 1, 0, 0, 0, 0, 0, |
| 3, 0, 16, 0, 0, 0, 0, 0, 1, 0, 4, 0, 5, 0}; |
| auto embedded_index = std::make_shared<Bytes>(embedded_bytes.size(), pool_.get()); |
| memcpy(embedded_index->data(), reinterpret_cast<const void*>(embedded_bytes.data()), |
| embedded_bytes.size()); |
| auto file_meta = std::make_shared<DataFileMeta>( |
| "data-68014988-5451-478f-a18a-a1668214cf3d-0.orc", /*file_size=*/689, |
| /*row_count=*/8, /*min_key=*/BinaryRow::EmptyRow(), |
| /*max_key=*/BinaryRow::EmptyRow(), /*key_stats=*/SimpleStats::EmptyStats(), |
| /*value_stats=*/ |
| BinaryRowGenerator::GenerateStats({std::string("Alice"), 10, 0, 11.1}, |
| {std::string("Tony"), 20, 1, 18.1}, {0, 0, 1, 0}, |
| pool_.get()), |
| /*min_sequence_number=*/0, /*max_sequence_number=*/7, /*schema_id=*/0, /*level=*/0, |
| /*extra_files=*/std::vector<std::optional<std::string>>(), |
| /*creation_time=*/Timestamp(1745251357742ll, 0), /*delete_row_count=*/0, |
| /*embedded_index=*/embedded_index, FileSource::Append(), |
| /*value_stats_cols=*/std::nullopt, |
| /*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt); |
| |
| DataSplitImpl::Builder builder(BinaryRow::EmptyRow(), /*bucket=*/0, |
| /*bucket_path=*/table_path + "bucket-0", {file_meta}); |
| ASSERT_OK_AND_ASSIGN(auto expected_data_split, builder.WithTotalBuckets(-1) |
| .WithSnapshot(2) |
| .IsStreaming(false) |
| .RawConvertible(true) |
| .Build()); |
| std::vector<std::shared_ptr<DataSplitImpl>> expected_data_splits = {expected_data_split}; |
| CheckResult(expected_data_splits, result_data_splits); |
| } |
| |
| TEST_F(ScanInteTest, TestScanAppendWithBitmapAndAlterTableWithEmptyResult) { |
| std::string table_path = |
| paimon::test::GetDataDir() + |
| "orc/append_with_bitmap_alter_table.db/append_with_bitmap_alter_table/"; |
| ScanContextBuilder context_builder(table_path); |
| |
| // child1 will remove file1 for schema1 |
| auto child1 = PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f1", FieldType::BIGINT, |
| Literal(100l)); |
| // child2 will remove file0 for schema 0 |
| auto child2 = PredicateBuilder::Equal(/*field_index=*/1, /*field_name=*/"f4", FieldType::STRING, |
| Literal(FieldType::STRING, "David", 5)); |
| ASSERT_OK_AND_ASSIGN(auto predicate, PredicateBuilder::And({child1, child2})); |
| |
| context_builder.SetPredicate(predicate).AddOption(Options::SCAN_SNAPSHOT_ID, "2"); |
| ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); |
| ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); |
| ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); |
| ASSERT_TRUE(result_plan->Splits().empty()); |
| } |
| |
| TEST_F(ScanInteTest, TestScanAppendWithTag1) { |
| std::string table_path = |
| paimon::test::GetDataDir() + "orc/append_table_with_tag.db/append_table_with_tag"; |
| ScanContextBuilder context_builder(table_path); |
| context_builder.AddOption(Options::SCAN_TAG_NAME, "1"); |
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<ScanContext> scan_context, context_builder.Finish()); |
| ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); |
| ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); |
| // check snapshot id |
| ASSERT_EQ(1, result_plan->SnapshotId().value()); |
| } |
| |
| TEST_F(ScanInteTest, TestScanInvalidTag) { |
| std::string table_path = |
| paimon::test::GetDataDir() + "orc/append_table_with_tag.db/append_table_with_tag"; |
| ScanContextBuilder context_builder(table_path); |
| context_builder.AddOption(Options::SCAN_TAG_NAME, "unknown"); |
| ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); |
| ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); |
| ASSERT_NOK_WITH_MSG(table_scan->CreatePlan(), "Tag 'unknown' doesn't exist."); |
| } |
| |
| } // namespace paimon::test |