| """ |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| """ |
| import logging |
| import time |
| from datetime import date, datetime |
| from decimal import Decimal |
| from unittest.mock import Mock |
| |
| import numpy as np |
| import pandas as pd |
| import pyarrow as pa |
| |
| from pypaimon import CatalogFactory, Schema |
| from pypaimon.api.options import Options |
| from pypaimon.catalog.catalog_context import CatalogContext |
| from pypaimon.catalog.rest.rest_catalog import RESTCatalog |
| from pypaimon.common.core_options import CoreOptions |
| from pypaimon.common.identifier import Identifier |
| from pypaimon.manifest.manifest_file_manager import ManifestFileManager |
| from pypaimon.manifest.schema.data_file_meta import DataFileMeta |
| from pypaimon.manifest.schema.manifest_entry import ManifestEntry |
| from pypaimon.manifest.schema.simple_stats import SimpleStats |
| from pypaimon.schema.data_types import AtomicType, DataField |
| from pypaimon.snapshot.snapshot_manager import SnapshotManager |
| from pypaimon.table.row.generic_row import (GenericRow, GenericRowDeserializer, |
| GenericRowSerializer) |
| from pypaimon.table.row.row_kind import RowKind |
| from pypaimon.tests.py36.pyarrow_compat import table_sort_by |
| from pypaimon.tests.rest.rest_base_test import RESTBaseTest |
| from pypaimon.write.file_store_commit import FileStoreCommit |
| |
| |
| class RESTAOReadWritePy36Test(RESTBaseTest): |
| |
| def test_overwrite(self): |
| simple_pa_schema = pa.schema([ |
| ('f0', pa.int32()), |
| ('f1', pa.string()) |
| ]) |
| schema = Schema.from_pyarrow_schema(simple_pa_schema, partition_keys=['f0'], |
| options={'dynamic-partition-overwrite': 'false'}) |
| self.rest_catalog.create_table('default.test_overwrite', schema, False) |
| table = self.rest_catalog.get_table('default.test_overwrite') |
| read_builder = table.new_read_builder() |
| |
| # test normal write |
| write_builder = table.new_batch_write_builder() |
| table_write = write_builder.new_write() |
| table_commit = write_builder.new_commit() |
| |
| df0 = pd.DataFrame({ |
| 'f0': [1, 2], |
| 'f1': ['apple', 'banana'], |
| }) |
| |
| table_write.write_pandas(df0) |
| table_commit.commit(table_write.prepare_commit()) |
| table_write.close() |
| table_commit.close() |
| |
| table_scan = read_builder.new_scan() |
| table_read = read_builder.new_read() |
| actual_df0 = table_read.to_pandas(table_scan.plan().splits()).sort_values(by='f0') |
| df0['f0'] = df0['f0'].astype('int32') |
| pd.testing.assert_frame_equal( |
| actual_df0.reset_index(drop=True), df0.reset_index(drop=True)) |
| |
| # test partially overwrite |
| write_builder = table.new_batch_write_builder().overwrite({'f0': 1}) |
| table_write = write_builder.new_write() |
| table_commit = write_builder.new_commit() |
| |
| df1 = pd.DataFrame({ |
| 'f0': [1], |
| 'f1': ['watermelon'], |
| }) |
| |
| table_write.write_pandas(df1) |
| table_commit.commit(table_write.prepare_commit()) |
| table_write.close() |
| table_commit.close() |
| |
| table_scan = read_builder.new_scan() |
| table_read = read_builder.new_read() |
| actual_df1 = table_read.to_pandas(table_scan.plan().splits()).sort_values(by='f0') |
| expected_df1 = pd.DataFrame({ |
| 'f0': [1, 2], |
| 'f1': ['watermelon', 'banana'] |
| }) |
| expected_df1['f0'] = expected_df1['f0'].astype('int32') |
| pd.testing.assert_frame_equal( |
| actual_df1.reset_index(drop=True), expected_df1.reset_index(drop=True)) |
| |
| # test fully overwrite |
| write_builder = table.new_batch_write_builder().overwrite() |
| table_write = write_builder.new_write() |
| table_commit = write_builder.new_commit() |
| |
| df2 = pd.DataFrame({ |
| 'f0': [3], |
| 'f1': ['Neo'], |
| }) |
| |
| table_write.write_pandas(df2) |
| table_commit.commit(table_write.prepare_commit()) |
| table_write.close() |
| table_commit.close() |
| |
| table_scan = read_builder.new_scan() |
| table_read = read_builder.new_read() |
| actual_df2 = table_read.to_pandas(table_scan.plan().splits()) |
| df2['f0'] = df2['f0'].astype('int32') |
| pd.testing.assert_frame_equal( |
| actual_df2.reset_index(drop=True), df2.reset_index(drop=True)) |
| |
| def test_full_data_types(self): |
| simple_pa_schema = pa.schema([ |
| ('f0', pa.int8()), |
| ('f1', pa.int16()), |
| ('f2', pa.int32()), |
| ('f3', pa.int64()), |
| ('f4', pa.float32()), |
| ('f5', pa.float64()), |
| ('f6', pa.bool_()), |
| ('f7', pa.string()), |
| ('f8', pa.binary()), |
| ('f9', pa.binary(10)), |
| ('f10', pa.decimal128(10, 2)), |
| ('f11', pa.date32()), |
| ]) |
| schema = Schema.from_pyarrow_schema(simple_pa_schema) |
| self.rest_catalog.create_table('default.test_full_data_types', schema, False) |
| table = self.rest_catalog.get_table('default.test_full_data_types') |
| |
| # to test read and write |
| write_builder = table.new_batch_write_builder() |
| table_write = write_builder.new_write() |
| table_commit = write_builder.new_commit() |
| expect_data = pa.Table.from_pydict({ |
| 'f0': [-1, 2], |
| 'f1': [-1001, 1002], |
| 'f2': [-1000001, 1000002], |
| 'f3': [-10000000001, 10000000002], |
| 'f4': [-1001.05, 1002.05], |
| 'f5': [-1000001.05, 1000002.05], |
| 'f6': [False, True], |
| 'f7': ['Hello', 'World'], |
| 'f8': [b'\x01\x02\x03', b'pyarrow'], |
| 'f9': [b'exactly_10', b'pad'.ljust(10, b'\x00')], |
| 'f10': [Decimal('-987.65'), Decimal('12345.67')], |
| 'f11': [date(1999, 12, 31), date(2023, 1, 1)], |
| }, schema=simple_pa_schema) |
| table_write.write_arrow(expect_data) |
| table_commit.commit(table_write.prepare_commit()) |
| table_write.close() |
| table_commit.close() |
| |
| read_builder = table.new_read_builder() |
| table_scan = read_builder.new_scan() |
| table_read = read_builder.new_read() |
| actual_data = table_read.to_arrow(table_scan.plan().splits()) |
| self.assertEqual(actual_data, expect_data) |
| |
| # to test GenericRow ability |
| latest_snapshot = SnapshotManager(table).get_latest_snapshot() |
| manifest_files = table_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot) |
| manifest_entries = table_scan.starting_scanner.manifest_file_manager.read( |
| manifest_files[0].file_name, |
| lambda row: table_scan.starting_scanner._filter_manifest_entry(row), |
| drop_stats=False) |
| min_value_stats = GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.min_values.data, |
| table.fields).values |
| max_value_stats = GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.max_values.data, |
| table.fields).values |
| expected_min_values = [col[0].as_py() for col in expect_data] |
| expected_max_values = [col[1].as_py() for col in expect_data] |
| self.assertEqual(min_value_stats, expected_min_values) |
| self.assertEqual(max_value_stats, expected_max_values) |
| |
| def test_mixed_add_and_delete_entries_same_partition(self): |
| """Test record_count calculation with mixed ADD/DELETE entries in same partition.""" |
| pa_schema = pa.schema([ |
| ('region', pa.string()), |
| ('city', pa.string()) |
| ]) |
| schema = Schema.from_pyarrow_schema(pa_schema) |
| self.rest_catalog.create_table('default.tb', schema, False) |
| table = self.rest_catalog.get_table('default.tb') |
| partition_fields = [ |
| DataField(0, "region", AtomicType("STRING")), |
| DataField(1, "city", AtomicType("STRING")) |
| ] |
| partition = GenericRow(['East', 'Boston'], partition_fields) |
| |
| # Create ADD entry |
| add_file_meta = Mock(spec=DataFileMeta) |
| add_file_meta.row_count = 200 |
| add_file_meta.file_size = 2048 |
| add_file_meta.creation_time = datetime.now() |
| |
| add_entry = ManifestEntry( |
| kind=0, # ADD |
| partition=partition, |
| bucket=0, |
| total_buckets=1, |
| file=add_file_meta |
| ) |
| |
| # Create DELETE entry |
| delete_file_meta = Mock(spec=DataFileMeta) |
| delete_file_meta.row_count = 80 |
| delete_file_meta.file_size = 800 |
| delete_file_meta.creation_time = datetime.now() |
| |
| delete_entry = ManifestEntry( |
| kind=1, # DELETE |
| partition=partition, |
| bucket=0, |
| total_buckets=1, |
| file=delete_file_meta |
| ) |
| file_store_commit = FileStoreCommit(None, table, "") |
| # Test the method with both entries |
| statistics = file_store_commit._generate_partition_statistics([add_entry, delete_entry]) |
| |
| # Verify results - should be merged into single partition statistics |
| self.assertEqual(len(statistics), 1) |
| stat = statistics[0] |
| |
| # Net record count: +200 + (-80) = 120 |
| self.assertEqual(stat.record_count, 120) |
| self.assertEqual(stat.file_count, 0) |
| self.assertEqual(stat.file_size_in_bytes, 1248) |
| |
| def test_multiple_partitions_with_different_operations(self): |
| """Test record_count calculation across multiple partitions.""" |
| pa_schema = pa.schema([ |
| ('region', pa.string()), |
| ('city', pa.string()) |
| ]) |
| schema = Schema.from_pyarrow_schema(pa_schema) |
| self.rest_catalog.create_table('default.tb1', schema, False) |
| table = self.rest_catalog.get_table('default.tb1') |
| partition_fields = [ |
| DataField(0, "region", AtomicType("STRING")), |
| DataField(1, "city", AtomicType("STRING")) |
| ] |
| partition1 = GenericRow(['East', 'Boston'], partition_fields) |
| file_meta1 = Mock(spec=DataFileMeta) |
| file_meta1.row_count = 150 |
| file_meta1.file_size = 1500 |
| file_meta1.creation_time = datetime.now() |
| |
| entry1 = ManifestEntry( |
| kind=0, # ADD |
| partition=partition1, |
| bucket=0, |
| total_buckets=1, |
| file=file_meta1 |
| ) |
| |
| # Partition 2: South/LA - DELETE operation |
| partition2 = GenericRow(['South', 'LA'], partition_fields) |
| file_meta2 = Mock(spec=DataFileMeta) |
| file_meta2.row_count = 75 |
| file_meta2.file_size = 750 |
| file_meta2.creation_time = datetime.now() |
| |
| entry2 = ManifestEntry( |
| kind=1, # DELETE |
| partition=partition2, |
| bucket=0, |
| total_buckets=1, |
| file=file_meta2 |
| ) |
| |
| file_store_commit = FileStoreCommit(None, table, "") |
| # Test the method with both entries |
| statistics = file_store_commit._generate_partition_statistics([entry1, entry2]) |
| |
| # Verify results - should have 2 separate partition statistics |
| self.assertEqual(len(statistics), 2) |
| |
| # Sort by partition spec for consistent testing |
| statistics.sort(key=lambda s: (s.spec.get('region', ''), s.spec.get('city', ''))) |
| |
| # Check North/NY partition (ADD) |
| north_stat = statistics[0] |
| self.assertEqual(north_stat.record_count, 150) # Positive for ADD |
| self.assertEqual(north_stat.file_count, 1) |
| self.assertEqual(north_stat.file_size_in_bytes, 1500) |
| |
| # Check South/LA partition (DELETE) |
| south_stat = statistics[1] |
| self.assertEqual(south_stat.record_count, -75) # Negative for DELETE |
| self.assertEqual(south_stat.file_count, -1) |
| self.assertEqual(south_stat.file_size_in_bytes, -750) |
| |
| def test_parquet_append_only_reader(self): |
| schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']) |
| self.rest_catalog.create_table('default.test_append_only_parquet', schema, False) |
| table = self.rest_catalog.get_table('default.test_append_only_parquet') |
| self._write_test_table(table) |
| |
| read_builder = table.new_read_builder() |
| actual = table_sort_by(self._read_test_table(read_builder), 'user_id') |
| self.assertEqual(actual, self.expected) |
| |
| def test_orc_append_only_reader(self): |
| schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'], options={'file.format': 'orc'}) |
| self.rest_catalog.create_table('default.test_append_only_orc', schema, False) |
| table = self.rest_catalog.get_table('default.test_append_only_orc') |
| self._write_test_table(table) |
| |
| read_builder = table.new_read_builder() |
| actual = table_sort_by(self._read_test_table(read_builder), 'user_id') |
| self.assertEqual(actual, self.expected) |
| |
| def test_avro_append_only_reader(self): |
| schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'], options={'file.format': 'avro'}) |
| self.rest_catalog.create_table('default.test_append_only_avro', schema, False) |
| table = self.rest_catalog.get_table('default.test_append_only_avro') |
| self._write_test_table(table) |
| |
| read_builder = table.new_read_builder() |
| actual = table_sort_by(self._read_test_table(read_builder), 'user_id') |
| self.assertEqual(actual, self.expected) |
| |
| def test_append_only_multi_write_once_commit(self): |
| schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']) |
| self.rest_catalog.create_table('default.test_append_only_multi_once_commit', schema, False) |
| table = self.rest_catalog.get_table('default.test_append_only_multi_once_commit') |
| write_builder = table.new_batch_write_builder() |
| |
| table_write = write_builder.new_write() |
| table_commit = write_builder.new_commit() |
| data1 = { |
| 'user_id': [1, 2, 3, 4], |
| 'item_id': [1001, 1002, 1003, 1004], |
| 'behavior': ['a', 'b', 'c', None], |
| 'dt': ['p1', 'p1', 'p2', 'p1'], |
| 'long-dt': ['2024-10-10', '2024-10-10', '2024-10-10', '2024-01-01'], |
| } |
| pa_table1 = pa.Table.from_pydict(data1, schema=self.pa_schema) |
| data2 = { |
| 'user_id': [5, 6, 7, 8], |
| 'item_id': [1005, 1006, 1007, 1008], |
| 'behavior': ['e', 'f', 'g', 'h'], |
| 'dt': ['p2', 'p1', 'p2', 'p2'], |
| 'long-dt': ['2024-10-10', '2025-01-23', 'abcdefghijklmnopk', '2025-08-08'], |
| } |
| pa_table2 = pa.Table.from_pydict(data2, schema=self.pa_schema) |
| |
| table_write.write_arrow(pa_table1) |
| table_write.write_arrow(pa_table2) |
| |
| table_commit.commit(table_write.prepare_commit()) |
| table_write.close() |
| table_commit.close() |
| |
| read_builder = table.new_read_builder() |
| actual = table_sort_by(self._read_test_table(read_builder), 'user_id') |
| self.assertEqual(actual, self.expected) |
| |
| def test_over1000cols_read(self): |
| num_rows = 1 |
| num_cols = 10 |
| table_name = "default.testBug" |
| # Generate dynamic schema based on column count |
| schema_fields = [] |
| for i in range(1, num_cols + 1): |
| col_name = f'c{i:03d}' |
| if i == 1: |
| schema_fields.append((col_name, pa.string())) # ID column |
| elif i == 2: |
| schema_fields.append((col_name, pa.string())) # Name column |
| elif i == 3: |
| schema_fields.append((col_name, pa.string())) # Category column (partition key) |
| elif i % 4 == 0: |
| schema_fields.append((col_name, pa.float64())) # Float columns |
| elif i % 4 == 1: |
| schema_fields.append((col_name, pa.int32())) # Int columns |
| elif i % 4 == 2: |
| schema_fields.append((col_name, pa.string())) # String columns |
| else: |
| schema_fields.append((col_name, pa.int64())) # Long columns |
| |
| pa_schema = pa.schema(schema_fields) |
| schema = Schema.from_pyarrow_schema( |
| pa_schema, |
| partition_keys=['c003'], # Use c003 as partition key |
| ) |
| |
| # Create table |
| self.rest_catalog.create_table(table_name, schema, False) |
| table = self.rest_catalog.get_table(table_name) |
| |
| # Generate test data |
| np.random.seed(42) # For reproducible results |
| categories = ['Electronics', 'Clothing', 'Books', 'Home', 'Sports', 'Food', 'Toys', 'Beauty', 'Health', 'Auto'] |
| statuses = ['Active', 'Inactive', 'Pending', 'Completed'] |
| |
| # Generate data dictionary |
| test_data = {} |
| for i in range(1, num_cols + 1): |
| col_name = f'c{i:03d}' |
| if i == 1: |
| test_data[col_name] = [f'Product_{j}' for j in range(1, num_rows + 1)] |
| elif i == 2: |
| test_data[col_name] = [f'Product_{j}' for j in range(1, num_rows + 1)] |
| elif i == 3: |
| test_data[col_name] = np.random.choice(categories, num_rows) |
| elif i % 4 == 0: |
| test_data[col_name] = np.random.uniform(1.0, 1000.0, num_rows).round(2) |
| elif i % 4 == 1: |
| test_data[col_name] = np.random.randint(1, 100, num_rows) |
| elif i % 4 == 2: |
| test_data[col_name] = np.random.choice(statuses, num_rows) |
| else: |
| test_data[col_name] = np.random.randint(1640995200, 1672531200, num_rows) |
| |
| test_df = pd.DataFrame(test_data) |
| |
| write_builder = table.new_batch_write_builder() |
| table_write = write_builder.new_write() |
| table_commit = write_builder.new_commit() |
| |
| table_write.write_pandas(test_df) |
| table_commit.commit(table_write.prepare_commit()) |
| table_write.close() |
| table_commit.close() |
| |
| read_builder = table.new_read_builder() |
| table_scan = read_builder.new_scan() |
| table_read = read_builder.new_read() |
| result = table_read.to_pandas(table_scan.plan().splits()) |
| self.assertEqual(result.to_dict(), test_df.to_dict()) |
| |
| def test_append_only_reader_with_filter(self): |
| schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']) |
| self.rest_catalog.create_table('default.test_append_only_filter', schema, False) |
| table = self.rest_catalog.get_table('default.test_append_only_filter') |
| self._write_test_table(table) |
| |
| predicate_builder = table.new_read_builder().new_predicate_builder() |
| p1 = predicate_builder.less_than('user_id', 7) |
| p2 = predicate_builder.greater_or_equal('user_id', 2) |
| p3 = predicate_builder.between('user_id', 0, 6) # [2/b, 3/c, 4/d, 5/e, 6/f] left |
| p4 = predicate_builder.is_not_in('behavior', ['b', 'e']) # [3/c, 4/d, 6/f] left |
| p5 = predicate_builder.is_in('dt', ['p1']) # exclude 3/c |
| p6 = predicate_builder.is_not_null('behavior') # exclude 4/d |
| g1 = predicate_builder.and_predicates([p1, p2, p3, p4, p5, p6]) |
| read_builder = table.new_read_builder().with_filter(g1) |
| actual = self._read_test_table(read_builder) |
| expected = pa.concat_tables([ |
| self.expected.slice(5, 1) # 6/f |
| ]) |
| self.assertEqual(table_sort_by(actual, 'user_id'), expected) |
| |
| p7 = predicate_builder.startswith('behavior', 'a') |
| p8 = predicate_builder.equal('item_id', 1002) |
| p9 = predicate_builder.is_null('behavior') |
| g2 = predicate_builder.or_predicates([p7, p8, p9]) |
| read_builder = table.new_read_builder().with_filter(g2) |
| actual = self._read_test_table(read_builder) |
| expected = pa.concat_tables([ |
| self.expected.slice(0, 1), # 1/a |
| self.expected.slice(1, 1), # 2/b |
| self.expected.slice(3, 1), # 5/e |
| ]) |
| self.assertEqual(table_sort_by(actual, 'user_id'), expected) |
| |
| # Same as java, 'not_equal' will also filter records of 'None' value |
| p12 = predicate_builder.not_equal('behavior', 'f') |
| read_builder = table.new_read_builder().with_filter(p12) |
| actual = self._read_test_table(read_builder) |
| expected = pa.concat_tables([ |
| # not only 6/f, but also 4/d will be filtered |
| self.expected.slice(0, 1), # 1/a |
| self.expected.slice(1, 1), # 2/b |
| self.expected.slice(2, 1), # 3/c |
| self.expected.slice(4, 1), # 5/e |
| self.expected.slice(6, 1), # 7/g |
| self.expected.slice(7, 1), # 8/h |
| ]) |
| self.assertEqual(table_sort_by(actual, 'user_id'), expected) |
| |
| def test_append_only_reader_with_projection(self): |
| schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']) |
| self.rest_catalog.create_table('default.test_append_only_projection', schema, False) |
| table = self.rest_catalog.get_table('default.test_append_only_projection') |
| self._write_test_table(table) |
| |
| read_builder = table.new_read_builder().with_projection(['dt', 'user_id']) |
| actual = table_sort_by(self._read_test_table(read_builder), 'user_id') |
| expected = self.expected.select(['dt', 'user_id']) |
| self.assertEqual(actual, expected) |
| |
| def test_avro_append_only_reader_with_projection(self): |
| schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'], options={'file.format': 'avro'}) |
| self.rest_catalog.create_table('default.test_avro_append_only_projection', schema, False) |
| table = self.rest_catalog.get_table('default.test_avro_append_only_projection') |
| self._write_test_table(table) |
| |
| read_builder = table.new_read_builder().with_projection(['dt', 'user_id']) |
| actual = table_sort_by(self._read_test_table(read_builder), 'user_id') |
| expected = self.expected.select(['dt', 'user_id']) |
| self.assertEqual(actual, expected) |
| |
| def test_append_only_reader_with_limit(self): |
| schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']) |
| self.rest_catalog.create_table('default.test_append_only_limit', schema, False) |
| table = self.rest_catalog.get_table('default.test_append_only_limit') |
| self._write_test_table(table) |
| |
| read_builder = table.new_read_builder().with_limit(1) |
| actual = self._read_test_table(read_builder) |
| # only records from 1st commit (1st split) will be read |
| # might be split of "dt=1" or split of "dt=2" |
| self.assertEqual(actual.num_rows, 4) |
| |
| def test_write_wrong_schema(self): |
| self.rest_catalog.create_table('default.test_wrong_schema', |
| Schema.from_pyarrow_schema(self.pa_schema), |
| False) |
| table = self.rest_catalog.get_table('default.test_wrong_schema') |
| |
| data = { |
| 'f0': [1, 2, 3], |
| 'f1': ['a', 'b', 'c'], |
| } |
| df = pd.DataFrame(data) |
| schema = pa.schema([ |
| ('f0', pa.int64()), |
| ('f1', pa.string()) |
| ]) |
| record_batch = pa.RecordBatch.from_pandas(df, schema) |
| |
| write_builder = table.new_batch_write_builder() |
| table_write = write_builder.new_write() |
| |
| with self.assertRaises(ValueError) as e: |
| table_write.write_arrow_batch(record_batch) |
| self.assertTrue(str(e.exception).startswith("Input schema isn't consistent with table schema and write cols.")) |
| |
| def test_write_wide_table_large_data(self): |
| logging.basicConfig(level=logging.INFO) |
| catalog = CatalogFactory.create(self.options) |
| |
| # Build table structure: 200 data columns + 1 partition column |
| # Create PyArrow schema |
| pa_fields = [] |
| |
| # Create 200 data columns f0 to f199 |
| for i in range(200): |
| pa_fields.append(pa.field(f"f{i}", pa.string(), metadata={"description": f"Column f{i}"})) |
| |
| # Add partition column dt |
| pa_fields.append(pa.field("dt", pa.string(), metadata={"description": "Partition column dt"})) |
| |
| # Create PyArrow schema |
| pa_schema = pa.schema(pa_fields) |
| |
| # Convert to Paimon Schema and specify partition key |
| schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=["dt"]) |
| |
| # Create table |
| table_identifier = Identifier.create("default", "wide_table_200cols") |
| try: |
| # If table already exists, drop it first |
| try: |
| catalog.get_table(table_identifier) |
| catalog.drop_table(table_identifier) |
| print(f"Dropped existing table {table_identifier}") |
| except Exception: |
| # Table does not exist, continue creating |
| pass |
| |
| # Create new table |
| catalog.create_table( |
| identifier=table_identifier, |
| schema=schema, |
| ignore_if_exists=False |
| ) |
| |
| print( |
| f"Successfully created table {table_identifier} with {len(pa_fields) - 1} " |
| f"data columns and 1 partition column") |
| print( |
| f"Table schema: {len([f for f in pa_fields if f.name != 'dt'])} data columns (f0-f199) + dt partition") |
| |
| except Exception as e: |
| print(f"Error creating table: {e}") |
| raise e |
| import random |
| |
| table_identifier = Identifier.create("default", "wide_table_200cols") |
| table = catalog.get_table(table_identifier) |
| |
| total_rows = 500000 # rows of data |
| batch_size = 100000 # 100,000 rows per batch |
| commit_batches = total_rows // batch_size |
| |
| for commit_batch in range(commit_batches): |
| start_idx = commit_batch * batch_size |
| end_idx = start_idx + batch_size |
| |
| print(f"Processing batch {commit_batch + 1}/{commit_batches} ({start_idx:,} - {end_idx:,})...") |
| # Generate data for current batch - generate data for all 200 columns |
| data = {} |
| # Generate data for f0-f199 |
| for i in range(200): |
| if i == 0: |
| data[f"f{i}"] = [f'value_{j}' for j in range(start_idx, end_idx)] |
| elif i == 1: |
| data[f"f{i}"] = [random.choice(['A', 'B', 'C', 'D', 'E']) for _ in range(batch_size)] |
| elif i == 2: |
| data[f"f{i}"] = [f'detail_{random.randint(1, 1000)}' for _ in range(batch_size)] |
| elif i == 3: |
| data[f"f{i}"] = [f'id_{j:06d}' for j in range(start_idx, end_idx)] |
| else: |
| # Generate random string data for other columns |
| data[f"f{i}"] = [f'col{i}_val_{random.randint(1, 10000)}' for _ in range(batch_size)] |
| |
| # Add partition column data |
| data['dt'] = ['2025-09-01' for _ in range(batch_size)] |
| # Convert dictionary to PyArrow RecordBatch |
| arrow_batch = pa.RecordBatch.from_pydict(data) |
| # Create new write and commit objects for each commit batch |
| write_builder = table.new_batch_write_builder() |
| table_write = write_builder.new_write() |
| table_commit = write_builder.new_commit() |
| |
| try: |
| # Write current batch data |
| table_write.write_arrow_batch(arrow_batch) |
| print("Batch data write completed, committing...") |
| # Commit current batch |
| commit_messages = table_write.prepare_commit() |
| table_commit.commit(commit_messages) |
| print(f"Batch {commit_batch + 1} committed successfully! Written {end_idx:,} rows of data") |
| |
| finally: |
| # Ensure resource cleanup |
| table_write.close() |
| table_commit.close() |
| |
| print( |
| f"All data writing completed! " |
| f"Total written {total_rows:,} rows of data to 200-column wide table in {commit_batches} commits") |
| rest_catalog = RESTCatalog(CatalogContext.create_from_options(Options(self.options))) |
| table = rest_catalog.get_table('default.wide_table_200cols') |
| predicate_builder = table.new_read_builder().new_predicate_builder() |
| read_builder = (table.new_read_builder() |
| .with_projection(['f0', 'f1']) |
| .with_filter(predicate=predicate_builder.equal("dt", "2025-09-01"))) |
| table_read = read_builder.new_read() |
| splits = read_builder.new_scan().plan().splits() |
| self.assertEqual(table_read.to_arrow(splits).num_rows, total_rows) |
| |
| def test_to_bytes_with_long_string(self): |
| """Test serialization of strings longer than 7 bytes which require variable part storage.""" |
| # Create fields with a long string value |
| fields = [ |
| DataField(0, "long_string", AtomicType("STRING")), |
| ] |
| |
| # String longer than 7 bytes will be stored in variable part |
| long_string = "This is a long string that exceeds 7 bytes" |
| values = [long_string] |
| |
| binary_row = GenericRow(values, fields, RowKind.INSERT) |
| serialized_bytes = GenericRowSerializer.to_bytes(binary_row) |
| |
| # Verify the last 6 bytes are 0 |
| # This is because the variable part data is rounded to the nearest word (8 bytes) |
| # The last 6 bytes check is to ensure proper padding |
| self.assertEqual(serialized_bytes[-6:], b'\x00\x00\x00\x00\x00\x00') |
| self.assertEqual(serialized_bytes[20:62].decode('utf-8'), long_string) |
| # Deserialize to verify |
| deserialized_row = GenericRowDeserializer.from_bytes(serialized_bytes, fields) |
| |
| self.assertEqual(deserialized_row.values[0], long_string) |
| self.assertEqual(deserialized_row.row_kind, RowKind.INSERT) |
| |
| def test_value_stats_cols_logic(self): |
| """Test _VALUE_STATS_COLS logic in ManifestFileManager.""" |
| self.rest_catalog.create_database("test_db", False) |
| |
| # Define schema with multiple fields |
| pa_schema = pa.schema([ |
| ('id', pa.int64()), |
| ('name', pa.string()), |
| ('price', pa.float64()), |
| ('category', pa.string()) |
| ]) |
| schema = Schema.from_pyarrow_schema(pa_schema) |
| self.rest_catalog.create_table("test_db.test_value_stats_cols", schema, False) |
| table = self.rest_catalog.get_table("test_db.test_value_stats_cols") |
| |
| # Create a ManifestFileManager |
| manifest_manager = ManifestFileManager(table) |
| |
| # Test case 1: _VALUE_STATS_COLS is None (should use all table fields) |
| self._test_value_stats_cols_case( |
| manifest_manager, |
| table, |
| value_stats_cols=None, |
| expected_fields_count=4, # All 4 fields |
| test_name="none_case" |
| ) |
| |
| # Test case 2: _VALUE_STATS_COLS is empty list (should use empty fields) |
| self._test_value_stats_cols_case( |
| manifest_manager, |
| table, |
| value_stats_cols=[], |
| expected_fields_count=0, # No fields |
| test_name="empty_case" |
| ) |
| |
| # Test case 3: _VALUE_STATS_COLS has specific columns |
| self._test_value_stats_cols_case( |
| manifest_manager, |
| table, |
| value_stats_cols=['id', 'name'], |
| expected_fields_count=2, # Only 2 specified fields |
| test_name="specific_case" |
| ) |
| |
| def test_incremental_timestamp(self): |
| schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']) |
| self.rest_catalog.create_table('default.test_incremental_parquet', schema, False) |
| table = self.rest_catalog.get_table('default.test_incremental_parquet') |
| timestamp = int(time.time() * 1000) |
| self._write_test_table(table) |
| |
| snapshot_manager = SnapshotManager(table) |
| t1 = snapshot_manager.get_snapshot_by_id(1).time_millis |
| t2 = snapshot_manager.get_snapshot_by_id(2).time_millis |
| # test 1 |
| table = table.copy({CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP: str(timestamp - 1) + ',' + str(timestamp)}) |
| read_builder = table.new_read_builder() |
| actual = self._read_test_table(read_builder) |
| self.assertEqual(len(actual), 0) |
| # test 2 |
| table = table.copy({CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP: str(timestamp) + ',' + str(t2)}) |
| read_builder = table.new_read_builder() |
| actual = table_sort_by(self._read_test_table(read_builder), 'user_id') |
| self.assertEqual(self.expected, actual) |
| # test 3 |
| table = table.copy({CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP: str(t1) + ',' + str(t2)}) |
| read_builder = table.new_read_builder() |
| actual = table_sort_by(self._read_test_table(read_builder), 'user_id') |
| expected = self.expected.slice(4, 4) |
| self.assertEqual(expected, actual) |
| |
| def _test_value_stats_cols_case(self, manifest_manager, table, value_stats_cols, expected_fields_count, test_name): |
| """Helper method to test a specific _VALUE_STATS_COLS case.""" |
| |
| # Create test data based on expected_fields_count |
| if expected_fields_count == 0: |
| # Empty fields case |
| test_fields = [] |
| min_values = [] |
| max_values = [] |
| null_counts = [] |
| elif expected_fields_count == 2: |
| # Specific fields case (id, name) |
| test_fields = [ |
| DataField(0, "id", AtomicType("BIGINT")), |
| DataField(1, "name", AtomicType("STRING")) |
| ] |
| min_values = [1, "apple"] |
| max_values = [100, "zebra"] |
| null_counts = [0, 0] |
| else: |
| # All fields case |
| test_fields = table.table_schema.fields |
| min_values = [1, "apple", 10.5, "electronics"] |
| max_values = [100, "zebra", 999.9, "toys"] |
| null_counts = [0, 0, 0, 0] |
| |
| # Create BinaryRows for min/max values |
| min_binary_row = GenericRow(min_values, test_fields) if test_fields else GenericRow([], []) |
| max_binary_row = GenericRow(max_values, test_fields) if test_fields else GenericRow([], []) |
| |
| # Create value_stats |
| value_stats = SimpleStats( |
| min_values=min_binary_row, |
| max_values=max_binary_row, |
| null_counts=null_counts |
| ) |
| |
| # Create key_stats (empty for simplicity) |
| empty_binary_row = GenericRow([], []) |
| key_stats = SimpleStats( |
| min_values=empty_binary_row, |
| max_values=empty_binary_row, |
| null_counts=[] |
| ) |
| |
| # Create DataFileMeta with value_stats_cols |
| file_meta = DataFileMeta( |
| file_name=f"test-file-{test_name}.parquet", |
| file_size=1024, |
| row_count=100, |
| min_key=empty_binary_row, |
| max_key=empty_binary_row, |
| key_stats=key_stats, |
| value_stats=value_stats, |
| min_sequence_number=1, |
| max_sequence_number=100, |
| schema_id=0, |
| level=0, |
| extra_files=[], |
| creation_time=1234567890, |
| delete_row_count=0, |
| embedded_index=None, |
| file_source=None, |
| value_stats_cols=value_stats_cols, # This is the key field we're testing |
| external_path=None, |
| first_row_id=None, |
| write_cols=None |
| ) |
| |
| # Create ManifestEntry |
| entry = ManifestEntry( |
| kind=0, # Normal entry |
| partition=empty_binary_row, |
| bucket=0, |
| total_buckets=1, |
| file=file_meta |
| ) |
| |
| # Write the manifest entry |
| manifest_file_name = f"manifest-test-{test_name}" |
| manifest_manager.write(manifest_file_name, [entry]) |
| |
| # Read the manifest entry back |
| entries = manifest_manager.read(manifest_file_name, drop_stats=False) |
| |
| # Verify we have exactly one entry |
| self.assertEqual(len(entries), 1) |
| |
| # Get the entry |
| read_entry = entries[0] |
| |
| # Verify value_stats_cols is preserved correctly |
| self.assertEqual(read_entry.file.value_stats_cols, value_stats_cols) |
| |
| # Verify value_stats structure based on the logic |
| if value_stats_cols is None: |
| # Should use all table fields - verify we have data for all fields |
| self.assertEqual(read_entry.file.value_stats.min_values.arity, expected_fields_count) |
| self.assertEqual(read_entry.file.value_stats.min_values.arity, expected_fields_count) |
| self.assertEqual(len(read_entry.file.value_stats.null_counts), expected_fields_count) |
| elif not value_stats_cols: # Empty list |
| # Should use empty fields - verify we have no field data |
| self.assertEqual(read_entry.file.value_stats.min_values.arity, 0) |
| self.assertEqual(read_entry.file.value_stats.max_values.arity, 0) |
| self.assertEqual(len(read_entry.file.value_stats.null_counts), 0) |
| else: |
| # Should use specified fields - verify we have data for specified fields only |
| self.assertEqual(read_entry.file.value_stats.min_values.arity, expected_fields_count) |
| self.assertEqual(read_entry.file.value_stats.max_values.arity, expected_fields_count) |
| self.assertEqual(len(read_entry.file.value_stats.null_counts), expected_fields_count) |
| |
| # Verify the actual values match what we expect |
| if expected_fields_count > 0: |
| self.assertEqual( |
| GenericRowDeserializer.from_bytes(read_entry.file.value_stats.min_values.data, test_fields).values, |
| min_values) |
| self.assertEqual( |
| GenericRowDeserializer.from_bytes(read_entry.file.value_stats.max_values.data, test_fields).values, |
| max_values) |
| |
| self.assertEqual(read_entry.file.value_stats.null_counts, null_counts) |