| """ |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| """ |
| import os |
| import shutil |
| import tempfile |
| import unittest |
| |
| import pyarrow as pa |
| |
| from pypaimon import CatalogFactory |
| from pypaimon.table.file_store_table import FileStoreTable |
| from pypaimon.write.commit_message import CommitMessage |
| |
| |
| class DataBlobWriterTest(unittest.TestCase): |
| """Tests for DataBlobWriter functionality with paimon table operations.""" |
| |
| @classmethod |
| def setUpClass(cls): |
| """Set up test environment.""" |
| cls.temp_dir = tempfile.mkdtemp() |
| cls.warehouse = os.path.join(cls.temp_dir, 'warehouse') |
| # Create catalog for table operations |
| cls.catalog = CatalogFactory.create({ |
| 'warehouse': cls.warehouse |
| }) |
| cls.catalog.create_database('test_db', False) |
| |
| @classmethod |
| def tearDownClass(cls): |
| """Clean up test environment.""" |
| try: |
| shutil.rmtree(cls.temp_dir) |
| except OSError: |
| pass |
| |
| def test_data_blob_writer_basic_functionality(self): |
| """Test basic DataBlobWriter functionality with paimon table.""" |
| from pypaimon import Schema |
| |
| # Create schema with normal and blob columns |
| pa_schema = pa.schema([ |
| ('id', pa.int32()), |
| ('name', pa.string()), |
| ('blob_data', pa.large_binary()), # This will be detected as blob |
| ]) |
| |
| # Create Paimon schema |
| schema = Schema.from_pyarrow_schema( |
| pa_schema, |
| options={ |
| 'row-tracking.enabled': 'true', |
| 'data-evolution.enabled': 'true' |
| } |
| ) |
| |
| # Create table |
| self.catalog.create_table('test_db.blob_writer_test', schema, False) |
| table = self.catalog.get_table('test_db.blob_writer_test') |
| |
| # Test data |
| test_data = pa.Table.from_pydict({ |
| 'id': [1, 2, 3], |
| 'name': ['Alice', 'Bob', 'Charlie'], |
| 'blob_data': [b'blob_data_1', b'blob_data_2', b'blob_data_3'] |
| }, schema=pa_schema) |
| |
| # Test DataBlobWriter initialization using proper table API |
| # Use proper table API to create writer |
| write_builder = table.new_batch_write_builder() |
| blob_writer = write_builder.new_write() |
| |
| # Write test data using BatchTableWrite API |
| blob_writer.write_arrow(test_data) |
| |
| # Test prepare commit |
| commit_messages = blob_writer.prepare_commit() |
| self.assertIsInstance(commit_messages, list) |
| self.assertGreater(len(commit_messages), 0) |
| |
| # Verify commit message structure |
| for commit_msg in commit_messages: |
| self.assertIsInstance(commit_msg.new_files, list) |
| self.assertGreater(len(commit_msg.new_files), 0) |
| |
| # Verify file metadata structure |
| for file_meta in commit_msg.new_files: |
| self.assertIsNotNone(file_meta.file_name) |
| self.assertGreater(file_meta.file_size, 0) |
| self.assertGreater(file_meta.row_count, 0) |
| |
| blob_writer.close() |
| |
| def test_data_blob_writer_schema_detection(self): |
| """Test that DataBlobWriter correctly detects blob columns from schema.""" |
| from pypaimon import Schema |
| |
| # Test schema with blob column |
| pa_schema = pa.schema([ |
| ('id', pa.int32()), |
| ('blob_field', pa.large_binary()), |
| ]) |
| |
| schema = Schema.from_pyarrow_schema( |
| pa_schema, |
| options={ |
| 'row-tracking.enabled': 'true', |
| 'data-evolution.enabled': 'true' |
| } |
| ) |
| self.catalog.create_table('test_db.blob_detection_test', schema, False) |
| table = self.catalog.get_table('test_db.blob_detection_test') |
| |
| # Use proper table API to create writer |
| write_builder = table.new_batch_write_builder() |
| blob_writer = write_builder.new_write() |
| |
| # Test that DataBlobWriter was created internally |
| # We can verify this by checking the internal data writers |
| test_data = pa.Table.from_pydict({ |
| 'id': [1, 2, 3], |
| 'blob_field': [b'blob1', b'blob2', b'blob3'] |
| }, schema=pa_schema) |
| |
| # Write data to trigger writer creation |
| blob_writer.write_arrow(test_data) |
| |
| # Verify that a DataBlobWriter was created internally |
| data_writers = blob_writer.file_store_write.data_writers |
| self.assertGreater(len(data_writers), 0) |
| |
| # Check that the writer is a DataBlobWriter |
| for writer in data_writers.values(): |
| from pypaimon.write.writer.data_blob_writer import DataBlobWriter |
| self.assertIsInstance(writer, DataBlobWriter) |
| |
| blob_writer.close() |
| |
| def test_data_blob_writer_no_blob_column(self): |
| """Test that DataBlobWriter raises error when no blob column is found.""" |
| from pypaimon import Schema |
| |
| # Test schema without blob column |
| pa_schema = pa.schema([ |
| ('id', pa.int32()), |
| ('name', pa.string()), |
| ]) |
| |
| schema = Schema.from_pyarrow_schema( |
| pa_schema, |
| options={ |
| 'row-tracking.enabled': 'true', |
| 'data-evolution.enabled': 'true' |
| } |
| ) |
| self.catalog.create_table('test_db.no_blob_test', schema, False) |
| table = self.catalog.get_table('test_db.no_blob_test') |
| |
| # Use proper table API to create writer |
| write_builder = table.new_batch_write_builder() |
| writer = write_builder.new_write() |
| |
| # Test that a regular writer (not DataBlobWriter) was created |
| test_data = pa.Table.from_pydict({ |
| 'id': [1, 2, 3], |
| 'name': ['Alice', 'Bob', 'Charlie'] |
| }, schema=pa_schema) |
| |
| # Write data to trigger writer creation |
| writer.write_arrow(test_data) |
| |
| # Verify that a regular writer was created (not DataBlobWriter) |
| data_writers = writer.file_store_write.data_writers |
| self.assertGreater(len(data_writers), 0) |
| |
| # Check that the writer is NOT a DataBlobWriter |
| for writer_instance in data_writers.values(): |
| from pypaimon.write.writer.data_blob_writer import DataBlobWriter |
| self.assertNotIsInstance(writer_instance, DataBlobWriter) |
| |
| writer.close() |
| |
| def test_data_blob_writer_multiple_blob_columns(self): |
| """Test that DataBlobWriter raises error when multiple blob columns are found.""" |
| from pypaimon import Schema |
| |
| # Test schema with multiple blob columns |
| pa_schema = pa.schema([ |
| ('id', pa.int32()), |
| ('blob1', pa.large_binary()), |
| ('blob2', pa.large_binary()), |
| ]) |
| |
| schema = Schema.from_pyarrow_schema( |
| pa_schema, |
| options={ |
| 'row-tracking.enabled': 'true', |
| 'data-evolution.enabled': 'true' |
| } |
| ) |
| self.catalog.create_table('test_db.multiple_blob_test', schema, False) |
| table = self.catalog.get_table('test_db.multiple_blob_test') |
| |
| # Use proper table API to create writer |
| write_builder = table.new_batch_write_builder() |
| writer = write_builder.new_write() |
| |
| # Test data with multiple blob columns |
| test_data = pa.Table.from_pydict({ |
| 'id': [1, 2, 3], |
| 'blob1': [b'blob1_1', b'blob1_2', b'blob1_3'], |
| 'blob2': [b'blob2_1', b'blob2_2', b'blob2_3'] |
| }, schema=pa_schema) |
| |
| # This should raise an error when DataBlobWriter is created internally |
| with self.assertRaises(ValueError) as context: |
| writer.write_arrow(test_data) |
| self.assertIn("Limit exactly one blob field in one paimon table yet", str(context.exception)) |
| |
| def test_data_blob_writer_write_operations(self): |
| """Test DataBlobWriter write operations with real data.""" |
| from pypaimon import Schema |
| |
| # Create schema with blob column |
| pa_schema = pa.schema([ |
| ('id', pa.int32()), |
| ('name', pa.string()), |
| ('document', pa.large_binary()), |
| ]) |
| |
| schema = Schema.from_pyarrow_schema( |
| pa_schema, |
| options={ |
| 'row-tracking.enabled': 'true', |
| 'data-evolution.enabled': 'true' |
| } |
| ) |
| self.catalog.create_table('test_db.write_test', schema, False) |
| table = self.catalog.get_table('test_db.write_test') |
| |
| # Use proper table API to create writer |
| write_builder = table.new_batch_write_builder() |
| blob_writer = write_builder.new_write() |
| |
| # Test data |
| test_data = pa.Table.from_pydict({ |
| 'id': [1, 2], |
| 'name': ['Alice', 'Bob'], |
| 'document': [b'document_content_1', b'document_content_2'] |
| }, schema=pa_schema) |
| |
| # Test writing data |
| for batch in test_data.to_batches(): |
| blob_writer.write_arrow_batch(batch) |
| |
| # Test prepare commit |
| commit_messages = blob_writer.prepare_commit() |
| self.assertIsInstance(commit_messages, list) |
| |
| blob_writer.close() |
| |
| def test_data_blob_writer_write_large_blob(self): |
| """Test DataBlobWriter with very large blob data (50MB per item) in 10 batches.""" |
| from pypaimon import Schema |
| |
| # Create schema with blob column |
| pa_schema = pa.schema([ |
| ('id', pa.int32()), |
| ('description', pa.string()), |
| ('large_blob', pa.large_binary()), |
| ]) |
| |
| schema = Schema.from_pyarrow_schema( |
| pa_schema, |
| options={ |
| 'row-tracking.enabled': 'true', |
| 'data-evolution.enabled': 'true' |
| } |
| ) |
| self.catalog.create_table('test_db.large_blob_test', schema, False) |
| table = self.catalog.get_table('test_db.large_blob_test') |
| |
| # Use proper table API to create writer |
| write_builder = table.new_batch_write_builder() |
| blob_writer = write_builder.new_write() |
| |
| # Create 50MB blob data per item |
| # Using a pattern to make the data more realistic and compressible |
| target_size = 50 * 1024 * 1024 # 50MB in bytes |
| blob_pattern = b'LARGE_BLOB_DATA_PATTERN_' + b'X' * 1024 # ~1KB pattern |
| pattern_size = len(blob_pattern) |
| repetitions = target_size // pattern_size |
| large_blob_data = blob_pattern * repetitions |
| |
| # Verify the blob size is approximately 50MB |
| blob_size_mb = len(large_blob_data) / (1024 * 1024) |
| self.assertGreater(blob_size_mb, 49) # Should be at least 49MB |
| self.assertLess(blob_size_mb, 51) # Should be less than 51MB |
| |
| total_rows = 0 |
| |
| # Write 10 batches, each with 5 rows (50 rows total) |
| # Total data volume: 50 rows * 50MB = 2.5GB of blob data |
| for batch_num in range(10): |
| batch_data = pa.Table.from_pydict({ |
| 'id': [batch_num * 5 + i for i in range(5)], |
| 'description': [f'Large blob batch {batch_num}, row {i}' for i in range(5)], |
| 'large_blob': [large_blob_data] * 5 # 5 rows per batch, each with 50MB blob |
| }, schema=pa_schema) |
| |
| # Write each batch |
| for batch in batch_data.to_batches(): |
| blob_writer.write_arrow_batch(batch) |
| total_rows += batch.num_rows |
| |
| # Log progress for large data processing |
| print(f"Completed batch {batch_num + 1}/10 with {batch.num_rows} rows") |
| |
| # Record count is tracked internally by DataBlobWriter |
| |
| # Test prepare commit |
| commit_messages: CommitMessage = blob_writer.prepare_commit() |
| self.assertIsInstance(commit_messages, list) |
| # Verify we have commit messages |
| self.assertEqual(len(commit_messages), 1) |
| commit_message = commit_messages[0] |
| normal_file_meta = commit_message.new_files[0] |
| blob_file_metas = commit_message.new_files[1:] |
| # Validate row count consistency |
| parquet_row_count = normal_file_meta.row_count |
| blob_row_count_sum = sum(meta.row_count for meta in blob_file_metas) |
| self.assertEqual(parquet_row_count, blob_row_count_sum, |
| f"Parquet row count ({parquet_row_count}) should equal " |
| f"sum of blob row counts ({blob_row_count_sum})") |
| |
| # Verify commit message structure and file metadata |
| total_file_size = 0 |
| total_row_count = parquet_row_count |
| for commit_msg in commit_messages: |
| self.assertIsInstance(commit_msg.new_files, list) |
| self.assertGreater(len(commit_msg.new_files), 0) |
| |
| # Verify file metadata structure |
| for file_meta in commit_msg.new_files: |
| self.assertIsNotNone(file_meta.file_name) |
| self.assertGreater(file_meta.file_size, 0) |
| self.assertGreater(file_meta.row_count, 0) |
| total_file_size += file_meta.file_size |
| |
| # Verify total data written (50 rows of normal data + 50 rows of blob data = 100 total) |
| self.assertEqual(total_row_count, 50) |
| |
| # Verify total file size is substantial (should be much larger than 2.5GB due to overhead) |
| total_size_mb = total_file_size / (1024 * 1024) |
| self.assertGreater(total_size_mb, 2000) # Should be at least 2GB due to overhead |
| |
| total_files = sum(len(commit_msg.new_files) for commit_msg in commit_messages) |
| print(f"Total data written: {total_size_mb:.2f}MB across {total_files} files") |
| print(f"Total rows processed: {total_row_count}") |
| |
| blob_writer.close() |
| |
| def test_data_blob_writer_abort_functionality(self): |
| """Test DataBlobWriter abort functionality.""" |
| from pypaimon import Schema |
| |
| # Create schema with blob column |
| pa_schema = pa.schema([ |
| ('id', pa.int32()), |
| ('blob_data', pa.large_binary()), |
| ]) |
| |
| schema = Schema.from_pyarrow_schema( |
| pa_schema, |
| options={ |
| 'row-tracking.enabled': 'true', |
| 'data-evolution.enabled': 'true' |
| } |
| ) |
| self.catalog.create_table('test_db.abort_test', schema, False) |
| table = self.catalog.get_table('test_db.abort_test') |
| |
| # Use proper table API to create writer |
| write_builder = table.new_batch_write_builder() |
| blob_writer = write_builder.new_write() |
| |
| # Test data |
| test_data = pa.Table.from_pydict({ |
| 'id': [1, 2], |
| 'blob_data': [b'blob_1', b'blob_2'] |
| }, schema=pa_schema) |
| |
| # Write some data |
| for batch in test_data.to_batches(): |
| blob_writer.write_arrow_batch(batch) |
| |
| # Test abort - BatchTableWrite doesn't have abort method |
| # The abort functionality is handled internally by DataBlobWriter |
| |
| blob_writer.close() |
| |
| def test_data_blob_writer_multiple_batches(self): |
| """Test DataBlobWriter with multiple batches and verify results.""" |
| from pypaimon import Schema |
| |
| # Create schema with blob column |
| pa_schema = pa.schema([ |
| ('id', pa.int32()), |
| ('name', pa.string()), |
| ('document', pa.large_binary()), |
| ]) |
| |
| schema = Schema.from_pyarrow_schema( |
| pa_schema, |
| options={ |
| 'row-tracking.enabled': 'true', |
| 'data-evolution.enabled': 'true' |
| } |
| ) |
| self.catalog.create_table('test_db.multiple_batches_test', schema, False) |
| table = self.catalog.get_table('test_db.multiple_batches_test') |
| |
| # Use proper table API to create writer |
| write_builder = table.new_batch_write_builder() |
| blob_writer = write_builder.new_write() |
| |
| # Test data - multiple batches |
| batch1_data = pa.Table.from_pydict({ |
| 'id': [1, 2], |
| 'name': ['Alice', 'Bob'], |
| 'document': [b'document_1_content', b'document_2_content'] |
| }, schema=pa_schema) |
| |
| batch2_data = pa.Table.from_pydict({ |
| 'id': [3, 4, 5], |
| 'name': ['Charlie', 'David', 'Eve'], |
| 'document': [b'document_3_content', b'document_4_content', b'document_5_content'] |
| }, schema=pa_schema) |
| |
| batch3_data = pa.Table.from_pydict({ |
| 'id': [6], |
| 'name': ['Frank'], |
| 'document': [b'document_6_content'] |
| }, schema=pa_schema) |
| |
| # Write multiple batches |
| total_rows = 0 |
| for batch in batch1_data.to_batches(): |
| blob_writer.write_arrow_batch(batch) |
| total_rows += batch.num_rows |
| |
| for batch in batch2_data.to_batches(): |
| blob_writer.write_arrow_batch(batch) |
| total_rows += batch.num_rows |
| |
| for batch in batch3_data.to_batches(): |
| blob_writer.write_arrow_batch(batch) |
| total_rows += batch.num_rows |
| |
| # Record count is tracked internally by DataBlobWriter |
| |
| # Test prepare commit |
| commit_messages = blob_writer.prepare_commit() |
| self.assertIsInstance(commit_messages, list) |
| |
| # Verify we have committed files |
| self.assertGreater(len(commit_messages), 0) |
| |
| blob_writer.close() |
| |
| def test_data_blob_writer_large_batches(self): |
| """Test DataBlobWriter with large batches to test rolling behavior.""" |
| from pypaimon import Schema |
| |
| # Create schema with blob column |
| pa_schema = pa.schema([ |
| ('id', pa.int32()), |
| ('description', pa.string()), |
| ('large_blob', pa.large_binary()), |
| ]) |
| |
| schema = Schema.from_pyarrow_schema( |
| pa_schema, |
| options={ |
| 'row-tracking.enabled': 'true', |
| 'data-evolution.enabled': 'true' |
| } |
| ) |
| self.catalog.create_table('test_db.large_batches_test', schema, False) |
| table = self.catalog.get_table('test_db.large_batches_test') |
| |
| # Use proper table API to create writer |
| write_builder = table.new_batch_write_builder() |
| blob_writer = write_builder.new_write() |
| |
| # Create large batches with substantial blob data |
| large_blob_data = b'L' * 10000 # 10KB blob data |
| |
| # Batch 1: 100 rows |
| batch1_data = pa.Table.from_pydict({ |
| 'id': list(range(1, 101)), |
| 'description': [f'Description for row {i}' for i in range(1, 101)], |
| 'large_blob': [large_blob_data] * 100 |
| }, schema=pa_schema) |
| |
| # Batch 2: 50 rows |
| batch2_data = pa.Table.from_pydict({ |
| 'id': list(range(101, 151)), |
| 'description': [f'Description for row {i}' for i in range(101, 151)], |
| 'large_blob': [large_blob_data] * 50 |
| }, schema=pa_schema) |
| |
| # Write large batches |
| total_rows = 0 |
| for batch in batch1_data.to_batches(): |
| blob_writer.write_arrow_batch(batch) |
| total_rows += batch.num_rows |
| |
| for batch in batch2_data.to_batches(): |
| blob_writer.write_arrow_batch(batch) |
| total_rows += batch.num_rows |
| |
| # Record count is tracked internally by DataBlobWriter |
| |
| # Test prepare commit |
| commit_messages = blob_writer.prepare_commit() |
| self.assertIsInstance(commit_messages, list) |
| |
| # Verify we have committed files |
| self.assertGreater(len(commit_messages), 0) |
| |
| blob_writer.close() |
| |
| def test_data_blob_writer_mixed_data_types(self): |
| """Test DataBlobWriter with mixed data types in blob column.""" |
| from pypaimon import Schema |
| |
| # Create schema with blob column |
| pa_schema = pa.schema([ |
| ('id', pa.int32()), |
| ('type', pa.string()), |
| ('data', pa.large_binary()), |
| ]) |
| |
| schema = Schema.from_pyarrow_schema( |
| pa_schema, |
| options={ |
| 'row-tracking.enabled': 'true', |
| 'data-evolution.enabled': 'true' |
| } |
| ) |
| self.catalog.create_table('test_db.mixed_data_test', schema, False) |
| table = self.catalog.get_table('test_db.mixed_data_test') |
| |
| # Use proper table API to create writer |
| write_builder = table.new_batch_write_builder() |
| blob_writer = write_builder.new_write() |
| |
| # Test data with different types of blob content |
| test_data = pa.Table.from_pydict({ |
| 'id': [1, 2, 3, 4, 5], |
| 'type': ['text', 'json', 'binary', 'image', 'pdf'], |
| 'data': [ |
| b'This is text content', |
| b'{"key": "value", "number": 42}', |
| b'\x00\x01\x02\x03\xff\xfe\xfd', |
| b'PNG_IMAGE_DATA_PLACEHOLDER', |
| b'%PDF-1.4\nPDF_CONTENT_PLACEHOLDER' |
| ] |
| }, schema=pa_schema) |
| |
| # Write mixed data |
| total_rows = 0 |
| for batch in test_data.to_batches(): |
| blob_writer.write_arrow_batch(batch) |
| total_rows += batch.num_rows |
| |
| # Record count is tracked internally by DataBlobWriter |
| |
| # Test prepare commit |
| commit_messages = blob_writer.prepare_commit() |
| self.assertIsInstance(commit_messages, list) |
| |
| # Verify we have committed files |
| self.assertGreater(len(commit_messages), 0) |
| |
| # Verify commit message structure |
| for commit_msg in commit_messages: |
| self.assertIsInstance(commit_msg.new_files, list) |
| self.assertGreater(len(commit_msg.new_files), 0) |
| |
| # Verify file metadata structure |
| for file_meta in commit_msg.new_files: |
| self.assertIsNotNone(file_meta.file_name) |
| self.assertGreater(file_meta.file_size, 0) |
| self.assertGreater(file_meta.row_count, 0) |
| |
| # Should have both normal and blob files |
| file_names = [f.file_name for f in commit_msg.new_files] |
| parquet_files = [f for f in file_names if f.endswith('.parquet')] |
| blob_files = [f for f in file_names if f.endswith('.blob')] |
| |
| self.assertGreater(len(parquet_files), 0, "Should have at least one parquet file") |
| self.assertGreater(len(blob_files), 0, "Should have at least one blob file") |
| |
| # Create commit and commit the data |
| commit = write_builder.new_commit() |
| commit.commit(commit_messages) |
| blob_writer.close() |
| |
| # Read data back using table API |
| read_builder = table.new_read_builder() |
| table_scan = read_builder.new_scan() |
| table_read = read_builder.new_read() |
| result = table_read.to_arrow(table_scan.plan().splits()) |
| |
| # Verify the data was read back correctly |
| self.assertEqual(result.num_rows, 5, "Should have 5 rows") |
| self.assertEqual(result.num_columns, 3, "Should have 3 columns") |
| |
| # Convert result to pandas for easier comparison |
| result_df = result.to_pandas() |
| |
| # Verify each row matches the original data |
| for i in range(5): |
| original_id = test_data.column('id')[i].as_py() |
| original_type = test_data.column('type')[i].as_py() |
| original_data = test_data.column('data')[i].as_py() |
| |
| result_id = result_df.iloc[i]['id'] |
| result_type = result_df.iloc[i]['type'] |
| result_data = result_df.iloc[i]['data'] |
| |
| self.assertEqual(result_id, original_id, f"Row {i + 1}: ID should match") |
| self.assertEqual(result_type, original_type, f"Row {i + 1}: Type should match") |
| self.assertEqual(result_data, original_data, f"Row {i + 1}: Blob data should match") |
| |
| def test_data_blob_writer_empty_batches(self): |
| """Test DataBlobWriter with empty batches.""" |
| from pypaimon import Schema |
| |
| # Create schema with blob column |
| pa_schema = pa.schema([ |
| ('id', pa.int32()), |
| ('data', pa.large_binary()), |
| ]) |
| |
| schema = Schema.from_pyarrow_schema( |
| pa_schema, |
| options={ |
| 'row-tracking.enabled': 'true', |
| 'data-evolution.enabled': 'true' |
| } |
| ) |
| self.catalog.create_table('test_db.empty_batches_test', schema, False) |
| table = self.catalog.get_table('test_db.empty_batches_test') |
| |
| # Use proper table API to create writer |
| write_builder = table.new_batch_write_builder() |
| blob_writer = write_builder.new_write() |
| |
| # Test data with some empty batches |
| batch1_data = pa.Table.from_pydict({ |
| 'id': [1, 2], |
| 'data': [b'data1', b'data2'] |
| }, schema=pa_schema) |
| |
| # Empty batch |
| empty_batch = pa.Table.from_pydict({ |
| 'id': [], |
| 'data': [] |
| }, schema=pa_schema) |
| |
| batch2_data = pa.Table.from_pydict({ |
| 'id': [3], |
| 'data': [b'data3'] |
| }, schema=pa_schema) |
| |
| # Write batches including empty ones |
| total_rows = 0 |
| for batch in batch1_data.to_batches(): |
| blob_writer.write_arrow_batch(batch) |
| total_rows += batch.num_rows |
| |
| for batch in empty_batch.to_batches(): |
| blob_writer.write_arrow_batch(batch) |
| total_rows += batch.num_rows |
| |
| for batch in batch2_data.to_batches(): |
| blob_writer.write_arrow_batch(batch) |
| total_rows += batch.num_rows |
| |
| # Verify record count (empty batch should not affect count) |
| # Record count is tracked internally by DataBlobWriter |
| # Record count is tracked internally by DataBlobWriter |
| |
| # Test prepare commit |
| commit_messages = blob_writer.prepare_commit() |
| self.assertIsInstance(commit_messages, list) |
| |
| blob_writer.close() |
| |
| def test_data_blob_writer_rolling_behavior(self): |
| """Test DataBlobWriter rolling behavior with multiple commits.""" |
| from pypaimon import Schema |
| |
| # Create schema with blob column |
| pa_schema = pa.schema([ |
| ('id', pa.int32()), |
| ('content', pa.string()), |
| ('blob_data', pa.large_binary()), |
| ]) |
| |
| schema = Schema.from_pyarrow_schema( |
| pa_schema, |
| options={ |
| 'row-tracking.enabled': 'true', |
| 'data-evolution.enabled': 'true' |
| } |
| ) |
| self.catalog.create_table('test_db.rolling_test', schema, False) |
| table = self.catalog.get_table('test_db.rolling_test') |
| |
| # Use proper table API to create writer |
| write_builder = table.new_batch_write_builder() |
| blob_writer = write_builder.new_write() |
| |
| # Create data that should trigger rolling |
| large_content = 'X' * 1000 # Large string content |
| large_blob = b'B' * 5000 # Large blob data |
| |
| # Write multiple batches to test rolling |
| for i in range(10): # 10 batches |
| batch_data = pa.Table.from_pydict({ |
| 'id': [i * 10 + j for j in range(10)], |
| 'content': [f'{large_content}_{i}_{j}' for j in range(10)], |
| 'blob_data': [large_blob] * 10 |
| }, schema=pa_schema) |
| |
| for batch in batch_data.to_batches(): |
| blob_writer.write_arrow_batch(batch) |
| |
| # Verify total record count |
| # Record count is tracked internally by DataBlobWriter |
| |
| # Test prepare commit |
| commit_messages = blob_writer.prepare_commit() |
| self.assertIsInstance(commit_messages, list) |
| |
| # Verify we have committed files |
| self.assertGreater(len(commit_messages), 0) |
| |
| # Verify file metadata structure |
| for commit_msg in commit_messages: |
| for file_meta in commit_msg.new_files: |
| self.assertIsNotNone(file_meta.file_name) |
| self.assertGreater(file_meta.file_size, 0) |
| self.assertGreater(file_meta.row_count, 0) |
| |
| blob_writer.close() |
| |
| def test_blob_write_read_end_to_end(self): |
| """Test complete end-to-end blob functionality: write blob data and read it back to verify correctness.""" |
| from pypaimon import Schema |
| |
| # Create schema with blob column |
| pa_schema = pa.schema([ |
| ('id', pa.int32()), |
| ('name', pa.string()), |
| ('description', pa.string()), |
| ('blob_data', pa.large_binary()), |
| ]) |
| |
| schema = Schema.from_pyarrow_schema( |
| pa_schema, |
| options={ |
| 'row-tracking.enabled': 'true', |
| 'data-evolution.enabled': 'true' |
| } |
| ) |
| self.catalog.create_table('test_db.blob_write_read_e2e', schema, False) |
| table = self.catalog.get_table('test_db.blob_write_read_e2e') |
| |
| # Test data with various blob sizes and types |
| test_data = pa.Table.from_pydict({ |
| 'id': [1, 2, 3, 4, 5], |
| 'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'], |
| 'description': ['User 1', 'User 2', 'User 3', 'User 4', 'User 5'], |
| 'blob_data': [ |
| b'small_blob_1', |
| b'medium_blob_data_2_with_more_content', |
| b'large_blob_data_3_with_even_more_content_and_details', |
| b'very_large_blob_data_4_with_extensive_content_and_multiple_details_here', # noqa: E501 |
| b'extremely_large_blob_data_5_with_comprehensive_content_and_' |
| b'extensive_details_covering_multiple_aspects' # noqa: E501 |
| ] |
| }, schema=pa_schema) |
| |
| # Write data using table API |
| write_builder = table.new_batch_write_builder() |
| writer = write_builder.new_write() |
| writer.write_arrow(test_data) |
| |
| # Commit the data |
| commit_messages = writer.prepare_commit() |
| self.assertGreater(len(commit_messages), 0) |
| |
| # Verify commit message structure |
| for commit_msg in commit_messages: |
| self.assertIsInstance(commit_msg.new_files, list) |
| self.assertGreater(len(commit_msg.new_files), 0) |
| |
| # Should have both normal and blob files |
| file_names = [f.file_name for f in commit_msg.new_files] |
| parquet_files = [f for f in file_names if f.endswith('.parquet')] |
| blob_files = [f for f in file_names if f.endswith('.blob')] |
| |
| self.assertGreater(len(parquet_files), 0, "Should have at least one parquet file") |
| self.assertGreater(len(blob_files), 0, "Should have at least one blob file") |
| |
| # Create commit and commit the data |
| commit = write_builder.new_commit() |
| commit.commit(commit_messages) |
| writer.close() |
| |
| # Read data back using table API |
| read_builder = table.new_read_builder() |
| table_scan = read_builder.new_scan() |
| table_read = read_builder.new_read() |
| result = table_read.to_arrow(table_scan.plan().splits()) |
| |
| # Verify the data was read back correctly |
| self.assertEqual(result.num_rows, 5, "Should have 5 rows") |
| self.assertEqual(result.num_columns, 4, "Should have 4 columns") |
| |
| # Verify normal columns |
| self.assertEqual(result.column('id').to_pylist(), [1, 2, 3, 4, 5], "ID column should match") |
| self.assertEqual(result.column('name').to_pylist(), ['Alice', 'Bob', 'Charlie', 'David', 'Eve'], |
| "Name column should match") # noqa: E501 |
| self.assertEqual(result.column('description').to_pylist(), ['User 1', 'User 2', 'User 3', 'User 4', 'User 5'], |
| "Description column should match") # noqa: E501 |
| |
| # Verify blob data correctness |
| blob_data = result.column('blob_data').to_pylist() |
| expected_blobs = [ |
| b'small_blob_1', |
| b'medium_blob_data_2_with_more_content', |
| b'large_blob_data_3_with_even_more_content_and_details', |
| b'very_large_blob_data_4_with_extensive_content_and_multiple_details_here', # noqa: E501 |
| b'extremely_large_blob_data_5_with_comprehensive_content_and_extensive_details_covering_multiple_aspects' |
| # noqa: E501 |
| ] |
| |
| self.assertEqual(len(blob_data), 5, "Should have 5 blob records") |
| self.assertEqual(blob_data, expected_blobs, "Blob data should match exactly") |
| |
| # Verify individual blob sizes |
| for i, (actual_blob, expected_blob) in enumerate(zip(blob_data, expected_blobs)): |
| self.assertEqual(len(actual_blob), len(expected_blob), f"Blob {i + 1} size should match") |
| self.assertEqual(actual_blob, expected_blob, f"Blob {i + 1} content should match exactly") |
| |
| print( |
| f"✅ End-to-end blob write/read test passed: wrote and read back {len(blob_data)} blob records correctly") # noqa: E501 |
| |
| def test_blob_write_read_partition(self): |
| """Test complete end-to-end blob functionality: write blob data and read it back to verify correctness.""" |
| from pypaimon import Schema |
| |
| # Create schema with blob column |
| pa_schema = pa.schema([ |
| ('id', pa.int32()), |
| ('name', pa.string()), |
| ('description', pa.string()), |
| ('blob_data', pa.large_binary()), |
| ]) |
| |
| schema = Schema.from_pyarrow_schema( |
| pa_schema, partition_keys=['name'], |
| # pa_schema, |
| options={ |
| 'row-tracking.enabled': 'true', |
| 'data-evolution.enabled': 'true' |
| } |
| ) |
| self.catalog.create_table('test_db.blob_write_read_partition', schema, False) |
| table = self.catalog.get_table('test_db.blob_write_read_partition') |
| |
| # Test data with various blob sizes and types |
| test_data = pa.Table.from_pydict({ |
| 'id': [1, 2, 3, 4, 5], |
| 'name': ['Alice', 'Alice', 'David', 'David', 'David'], |
| 'description': ['User 1', 'User 2', 'User 3', 'User 4', 'User 5'], |
| 'blob_data': [ |
| b'small_blob_1', |
| b'medium_blob_data_2_with_more_content', |
| b'large_blob_data_3_with_even_more_content_and_details', |
| b'very_large_blob_data_4_with_extensive_content_and_multiple_details_here', # noqa: E501 |
| b'extremely_large_blob_data_5_with_comprehensive_content_and_' |
| b'extensive_details_covering_multiple_aspects' |
| # noqa: E501 |
| ] |
| }, schema=pa_schema) |
| |
| # Write data using table API |
| write_builder = table.new_batch_write_builder() |
| writer = write_builder.new_write() |
| writer.write_arrow(test_data) |
| |
| # Commit the data |
| commit_messages = writer.prepare_commit() |
| |
| # Create commit and commit the data |
| commit = write_builder.new_commit() |
| commit.commit(commit_messages) |
| writer.close() |
| |
| # Read data back using table API |
| read_builder = table.new_read_builder() |
| table_scan = read_builder.new_scan() |
| table_read = read_builder.new_read() |
| splits = table_scan.plan().splits() |
| result = table_read.to_arrow(splits) |
| |
| # Verify the data was read back correctly |
| self.assertEqual(result.num_rows, 5, "Should have 5 rows") |
| self.assertEqual(result.num_columns, 4, "Should have 4 columns") |
| |
| # Verify normal columns |
| self.assertEqual(result.column('id').to_pylist(), [1, 2, 3, 4, 5], "ID column should match") |
| self.assertEqual(result.column('name').to_pylist(), ['Alice', 'Alice', 'David', 'David', 'David'], |
| "Name column should match") # noqa: E501 |
| self.assertEqual(result.column('description').to_pylist(), ['User 1', 'User 2', 'User 3', 'User 4', 'User 5'], |
| "Description column should match") # noqa: E501 |
| |
| # Verify blob data correctness |
| blob_data = result.column('blob_data').to_pylist() |
| expected_blobs = [ |
| b'small_blob_1', |
| b'medium_blob_data_2_with_more_content', |
| b'large_blob_data_3_with_even_more_content_and_details', |
| b'very_large_blob_data_4_with_extensive_content_and_multiple_details_here', # noqa: E501 |
| b'extremely_large_blob_data_5_with_comprehensive_content_and_extensive_details_covering_multiple_aspects' |
| # noqa: E501 |
| ] |
| |
| self.assertEqual(len(blob_data), 5, "Should have 5 blob records") |
| self.assertEqual(blob_data, expected_blobs, "Blob data should match exactly") |
| |
| # Verify individual blob sizes |
| for i, (actual_blob, expected_blob) in enumerate(zip(blob_data, expected_blobs)): |
| self.assertEqual(len(actual_blob), len(expected_blob), f"Blob {i + 1} size should match") |
| self.assertEqual(actual_blob, expected_blob, f"Blob {i + 1} content should match exactly") |
| |
| print( |
| f"✅ End-to-end blob write/read test passed: wrote and read back {len(blob_data)} blob records correctly") # noqa: E501 |
| |
| def test_blob_write_read_end_to_end_with_descriptor(self): |
| """Test end-to-end blob functionality using blob descriptors.""" |
| import random |
| from pypaimon import Schema |
| from pypaimon.table.row.blob import BlobDescriptor, Blob |
| from pypaimon.common.uri_reader import UriReaderFactory |
| from pypaimon.common.config import CatalogOptions |
| |
| # Create schema with blob column |
| pa_schema = pa.schema([ |
| ('id', pa.int32()), |
| ('name', pa.string()), |
| ('picture', pa.large_binary()), |
| ]) |
| |
| schema = Schema.from_pyarrow_schema( |
| pa_schema, |
| options={ |
| 'row-tracking.enabled': 'true', |
| 'data-evolution.enabled': 'true', |
| 'blob-as-descriptor': 'true' |
| } |
| ) |
| |
| # Create table |
| self.catalog.create_table('test_db.blob_descriptor_test', schema, False) |
| table: FileStoreTable = self.catalog.get_table('test_db.blob_descriptor_test') |
| |
| # Create test blob data (1MB) |
| blob_data = bytearray(1024 * 1024) |
| random.seed(42) # For reproducible tests |
| for i in range(len(blob_data)): |
| blob_data[i] = random.randint(0, 255) |
| blob_data = bytes(blob_data) |
| |
| # Create external blob file |
| external_blob_path = os.path.join(self.temp_dir, 'external_blob') |
| with open(external_blob_path, 'wb') as f: |
| f.write(blob_data) |
| |
| # Create blob descriptor pointing to external file |
| blob_descriptor = BlobDescriptor(external_blob_path, 0, len(blob_data)) |
| |
| # Create test data with blob descriptor |
| test_data = pa.Table.from_pydict({ |
| 'id': [1], |
| 'name': ['paimon'], |
| 'picture': [blob_descriptor.serialize()] |
| }, schema=pa_schema) |
| |
| # Write data using table API |
| write_builder = table.new_batch_write_builder() |
| writer = write_builder.new_write() |
| writer.write_arrow(test_data) |
| |
| # Commit the data |
| commit_messages = writer.prepare_commit() |
| commit = write_builder.new_commit() |
| commit.commit(commit_messages) |
| |
| # Read data back |
| read_builder = table.new_read_builder() |
| table_scan = read_builder.new_scan() |
| table_read = read_builder.new_read() |
| result = table_read.to_arrow(table_scan.plan().splits()) |
| |
| # Verify the data was written and read correctly |
| self.assertEqual(result.num_rows, 1, "Should have 1 row") |
| self.assertEqual(result.column('id').to_pylist(), [1], "ID should match") |
| self.assertEqual(result.column('name').to_pylist(), ['paimon'], "Name should match") |
| |
| # Get the blob descriptor bytes from the result |
| picture_bytes = result.column('picture').to_pylist()[0] |
| self.assertIsInstance(picture_bytes, bytes, "Picture should be bytes") |
| |
| # Deserialize the blob descriptor |
| new_blob_descriptor = BlobDescriptor.deserialize(picture_bytes) |
| |
| # The URI might be different if the blob was stored in the table's data directory |
| # Let's verify the descriptor properties and try to read the data |
| # Note: offset might be non-zero due to blob file format overhead |
| self.assertGreaterEqual(new_blob_descriptor.offset, 0, "Offset should be non-negative") |
| self.assertEqual(new_blob_descriptor.length, len(blob_data), "Length should match") |
| |
| # Create URI reader factory and read the blob data |
| catalog_options = {CatalogOptions.WAREHOUSE: self.warehouse} |
| uri_reader_factory = UriReaderFactory(catalog_options) |
| uri_reader = uri_reader_factory.create(new_blob_descriptor.uri) |
| blob = Blob.from_descriptor(uri_reader, new_blob_descriptor) |
| |
| # Verify the blob data matches the original |
| self.assertEqual(blob.to_data(), blob_data, "Blob data should match original") |
| |
| print("✅ Blob descriptor end-to-end test passed:") |
| print(" - Created external blob file and descriptor") |
| print(" - Wrote and read blob descriptor successfully") |
| print(" - Verified blob data can be read from descriptor") |
| print(" - Tested blob-as-descriptor=true mode") |
| |
| def test_blob_write_read_large_data_end_to_end(self): |
| """Test end-to-end blob functionality with large blob data (1MB per blob).""" |
| from pypaimon import Schema |
| |
| # Create schema with blob column |
| pa_schema = pa.schema([ |
| ('id', pa.int32()), |
| ('metadata', pa.string()), |
| ('large_blob', pa.large_binary()), |
| ]) |
| |
| schema = Schema.from_pyarrow_schema( |
| pa_schema, |
| options={ |
| 'row-tracking.enabled': 'true', |
| 'data-evolution.enabled': 'true' |
| } |
| ) |
| self.catalog.create_table('test_db.blob_large_write_read_e2e', schema, False) |
| table = self.catalog.get_table('test_db.blob_large_write_read_e2e') |
| |
| # Create large blob data (1MB per blob) |
| large_blob_size = 1024 * 1024 # 1MB |
| blob_pattern = b'LARGE_BLOB_PATTERN_' + b'X' * 1024 # ~1KB pattern |
| pattern_size = len(blob_pattern) |
| repetitions = large_blob_size // pattern_size |
| large_blob_data = blob_pattern * repetitions |
| |
| # Test data with large blobs |
| test_data = pa.Table.from_pydict({ |
| 'id': [1, 2, 3], |
| 'metadata': ['Large blob 1', 'Large blob 2', 'Large blob 3'], |
| 'large_blob': [large_blob_data, large_blob_data, large_blob_data] |
| }, schema=pa_schema) |
| |
| # Write data |
| write_builder = table.new_batch_write_builder() |
| writer = write_builder.new_write() |
| writer.write_arrow(test_data) |
| |
| # Commit the data |
| commit_messages = writer.prepare_commit() |
| self.assertGreater(len(commit_messages), 0) |
| |
| # Verify commit message structure |
| for commit_msg in commit_messages: |
| self.assertIsInstance(commit_msg.new_files, list) |
| self.assertGreater(len(commit_msg.new_files), 0) |
| |
| # Should have both normal and blob files |
| file_names = [f.file_name for f in commit_msg.new_files] |
| parquet_files = [f for f in file_names if f.endswith('.parquet')] |
| blob_files = [f for f in file_names if f.endswith('.blob')] |
| |
| self.assertGreater(len(parquet_files), 0, "Should have at least one parquet file") |
| self.assertGreater(len(blob_files), 0, "Should have at least one blob file") |
| |
| commit = write_builder.new_commit() |
| commit.commit(commit_messages) |
| writer.close() |
| |
| # Read data back |
| read_builder = table.new_read_builder() |
| table_scan = read_builder.new_scan() |
| table_read = read_builder.new_read() |
| result = table_read.to_arrow(table_scan.plan().splits()) |
| |
| # Verify the data |
| self.assertEqual(result.num_rows, 3, "Should have 3 rows") |
| self.assertEqual(result.num_columns, 3, "Should have 3 columns") |
| |
| # Verify normal columns |
| self.assertEqual(result.column('id').to_pylist(), [1, 2, 3], "ID column should match") |
| self.assertEqual(result.column('metadata').to_pylist(), ['Large blob 1', 'Large blob 2', 'Large blob 3'], |
| "Metadata column should match") # noqa: E501 |
| |
| # Verify blob data integrity |
| blob_data = result.column('large_blob').to_pylist() |
| self.assertEqual(len(blob_data), 3, "Should have 3 blob records") |
| |
| for i, blob in enumerate(blob_data): |
| self.assertEqual(len(blob), len(large_blob_data), f"Blob {i + 1} should be {large_blob_size} bytes") |
| self.assertEqual(blob, large_blob_data, f"Blob {i + 1} content should match exactly") |
| print(f"✅ Verified large blob {i + 1}: {len(blob)} bytes") |
| |
| print( |
| f"✅ Large blob end-to-end test passed: wrote and read back {len(blob_data)} large blob records correctly") # noqa: E501 |
| |
| def test_blob_write_read_mixed_sizes_end_to_end(self): |
| """Test end-to-end blob functionality with mixed blob sizes.""" |
| from pypaimon import Schema |
| |
| # Create schema with blob column |
| pa_schema = pa.schema([ |
| ('id', pa.int32()), |
| ('size_category', pa.string()), |
| ('blob_data', pa.large_binary()), |
| ]) |
| |
| schema = Schema.from_pyarrow_schema( |
| pa_schema, |
| options={ |
| 'row-tracking.enabled': 'true', |
| 'data-evolution.enabled': 'true' |
| } |
| ) |
| self.catalog.create_table('test_db.blob_mixed_sizes_write_read_e2e', schema, False) |
| table = self.catalog.get_table('test_db.blob_mixed_sizes_write_read_e2e') |
| |
| # Create blobs of different sizes |
| tiny_blob = b'tiny' |
| small_blob = b'small_blob_data' * 10 # ~140 bytes |
| medium_blob = b'medium_blob_data' * 100 # ~1.4KB |
| large_blob = b'large_blob_data' * 1000 # ~14KB |
| huge_blob = b'huge_blob_data' * 10000 # ~140KB |
| |
| test_data = pa.Table.from_pydict({ |
| 'id': [1, 2, 3, 4, 5], |
| 'size_category': ['tiny', 'small', 'medium', 'large', 'huge'], |
| 'blob_data': [tiny_blob, small_blob, medium_blob, large_blob, huge_blob] |
| }, schema=pa_schema) |
| |
| # Write data |
| write_builder = table.new_batch_write_builder() |
| writer = write_builder.new_write() |
| writer.write_arrow(test_data) |
| |
| # Commit |
| commit_messages = writer.prepare_commit() |
| self.assertGreater(len(commit_messages), 0) |
| |
| # Verify commit message structure |
| for commit_msg in commit_messages: |
| self.assertIsInstance(commit_msg.new_files, list) |
| self.assertGreater(len(commit_msg.new_files), 0) |
| |
| # Should have both normal and blob files |
| file_names = [f.file_name for f in commit_msg.new_files] |
| parquet_files = [f for f in file_names if f.endswith('.parquet')] |
| blob_files = [f for f in file_names if f.endswith('.blob')] |
| |
| self.assertGreater(len(parquet_files), 0, "Should have at least one parquet file") |
| self.assertGreater(len(blob_files), 0, "Should have at least one blob file") |
| |
| commit = write_builder.new_commit() |
| commit.commit(commit_messages) |
| writer.close() |
| |
| # Read back |
| read_builder = table.new_read_builder() |
| table_scan = read_builder.new_scan() |
| table_read = read_builder.new_read() |
| result = table_read.to_arrow(table_scan.plan().splits()) |
| |
| # Verify |
| self.assertEqual(result.num_rows, 5, "Should have 5 rows") |
| self.assertEqual(result.num_columns, 3, "Should have 3 columns") |
| |
| # Verify normal columns |
| self.assertEqual(result.column('id').to_pylist(), [1, 2, 3, 4, 5], "ID column should match") |
| self.assertEqual(result.column('size_category').to_pylist(), ['tiny', 'small', 'medium', 'large', 'huge'], |
| "Size category column should match") # noqa: E501 |
| |
| # Verify blob data |
| blob_data = result.column('blob_data').to_pylist() |
| expected_blobs = [tiny_blob, small_blob, medium_blob, large_blob, huge_blob] |
| |
| self.assertEqual(len(blob_data), 5, "Should have 5 blob records") |
| self.assertEqual(blob_data, expected_blobs, "Blob data should match exactly") |
| |
| # Verify sizes |
| sizes = [len(blob) for blob in blob_data] |
| expected_sizes = [len(blob) for blob in expected_blobs] |
| self.assertEqual(sizes, expected_sizes, "Blob sizes should match") |
| |
| # Verify individual blob content |
| for i, (actual_blob, expected_blob) in enumerate(zip(blob_data, expected_blobs)): |
| self.assertEqual(actual_blob, expected_blob, f"Blob {i + 1} content should match exactly") |
| |
| print( |
| f"✅ Mixed sizes end-to-end test passed: wrote and read back blobs ranging from {min(sizes)} to {max(sizes)} bytes") # noqa: E501 |
| |
| def test_blob_write_read_large_data_end_to_end_with_rolling(self): |
| """Test end-to-end blob functionality with large blob data (50MB per blob) and rolling behavior (40 blobs).""" |
| from pypaimon import Schema |
| |
| # Create schema with blob column |
| pa_schema = pa.schema([ |
| ('id', pa.int32()), |
| ('batch_id', pa.int32()), |
| ('metadata', pa.string()), |
| ('large_blob', pa.large_binary()), |
| ]) |
| |
| schema = Schema.from_pyarrow_schema( |
| pa_schema, |
| options={ |
| 'row-tracking.enabled': 'true', |
| 'data-evolution.enabled': 'true' |
| } |
| ) |
| self.catalog.create_table('test_db.blob_large_rolling_e2e', schema, False) |
| table = self.catalog.get_table('test_db.blob_large_rolling_e2e') |
| |
| # Create large blob data (50MB per blob) |
| large_blob_size = 50 * 1024 * 1024 # 50MB |
| blob_pattern = b'LARGE_BLOB_PATTERN_' + b'X' * 1024 # ~1KB pattern |
| pattern_size = len(blob_pattern) |
| repetitions = large_blob_size // pattern_size |
| large_blob_data = blob_pattern * repetitions |
| |
| # Verify the blob size is exactly 50MB |
| actual_size = len(large_blob_data) |
| print(f"Created blob data: {actual_size:,} bytes ({actual_size / (1024 * 1024):.2f} MB)") |
| |
| # Write 40 batches of data (each with 1 blob of 50MB) |
| write_builder = table.new_batch_write_builder() |
| writer = write_builder.new_write() |
| |
| # Write all 40 batches first |
| for batch_id in range(40): |
| # Create test data for this batch |
| test_data = pa.Table.from_pydict({ |
| 'id': [batch_id + 1], |
| 'batch_id': [batch_id], |
| 'metadata': [f'Large blob batch {batch_id + 1}'], |
| 'large_blob': [large_blob_data] |
| }, schema=pa_schema) |
| |
| # Write data |
| writer.write_arrow(test_data) |
| |
| # Print progress every 10 batches |
| if (batch_id + 1) % 10 == 0: |
| print(f"✅ Written batch {batch_id + 1}/40: {len(large_blob_data):,} bytes") |
| |
| print("✅ Successfully wrote all 40 batches of 50MB blobs") |
| |
| # Commit all data at once |
| commit_messages = writer.prepare_commit() |
| self.assertGreater(len(commit_messages), 0) |
| |
| # Verify commit message structure |
| for commit_msg in commit_messages: |
| self.assertIsInstance(commit_msg.new_files, list) |
| self.assertGreater(len(commit_msg.new_files), 0) |
| |
| # Should have both normal and blob files |
| file_names = [f.file_name for f in commit_msg.new_files] |
| parquet_files = [f for f in file_names if f.endswith('.parquet')] |
| blob_files = [f for f in file_names if f.endswith('.blob')] |
| |
| self.assertGreater(len(parquet_files), 0, "Should have at least one parquet file") |
| self.assertGreater(len(blob_files), 0, "Should have at least one blob file") |
| |
| # Commit the data |
| commit = write_builder.new_commit() |
| commit.commit(commit_messages) |
| writer.close() |
| |
| print(f"✅ Successfully committed {len(commit_messages)} commit messages with 40 batches of 50MB blobs") |
| |
| # Read data back |
| read_builder = table.new_read_builder() |
| table_scan = read_builder.new_scan() |
| table_read = read_builder.new_read() |
| result = table_read.to_arrow(table_scan.plan().splits()) |
| |
| # Verify the data |
| self.assertEqual(result.num_rows, 40, "Should have 40 rows") |
| self.assertEqual(result.num_columns, 4, "Should have 4 columns") |
| |
| # Verify normal columns |
| expected_ids = list(range(1, 41)) |
| expected_batch_ids = list(range(40)) |
| expected_metadata = [f'Large blob batch {i}' for i in range(1, 41)] |
| |
| self.assertEqual(result.column('id').to_pylist(), expected_ids, "ID column should match") |
| self.assertEqual(result.column('batch_id').to_pylist(), expected_batch_ids, |
| "Batch ID column should match") # noqa: E501 |
| self.assertEqual(result.column('metadata').to_pylist(), expected_metadata, |
| "Metadata column should match") # noqa: E501 |
| |
| # Verify blob data integrity |
| blob_data = result.column('large_blob').to_pylist() |
| self.assertEqual(len(blob_data), 40, "Should have 40 blob records") |
| |
| # Verify each blob |
| for i, blob in enumerate(blob_data): |
| self.assertEqual(len(blob), len(large_blob_data), f"Blob {i + 1} should be {large_blob_size:,} bytes") |
| self.assertEqual(blob, large_blob_data, f"Blob {i + 1} content should match exactly") |
| |
| # Print progress every 10 blobs |
| if (i + 1) % 10 == 0: |
| print(f"✅ Verified blob {i + 1}/40: {len(blob):,} bytes") |
| |
| # Verify total data size |
| total_blob_size = sum(len(blob) for blob in blob_data) |
| expected_total_size = 40 * len(large_blob_data) |
| self.assertEqual(total_blob_size, expected_total_size, |
| f"Total blob size should be {expected_total_size:,} bytes") |
| |
| print("✅ Large blob rolling end-to-end test passed:") |
| print(" - Wrote and read back 40 blobs of 50MB each") |
| print( |
| f" - Total data size: {total_blob_size:,} bytes ({total_blob_size / (1024 * 1024 * 1024):.2f} GB)") # noqa: E501 |
| print(" - All blob content verified as correct") |
| |
| def test_data_blob_writer_with_shard(self): |
| """Test DataBlobWriter with mixed data types in blob column.""" |
| from pypaimon import Schema |
| |
| # Create schema with blob column |
| pa_schema = pa.schema([ |
| ('id', pa.int32()), |
| ('type', pa.string()), |
| ('data', pa.large_binary()), |
| ]) |
| |
| schema = Schema.from_pyarrow_schema( |
| pa_schema, |
| options={ |
| 'row-tracking.enabled': 'true', |
| 'data-evolution.enabled': 'true' |
| } |
| ) |
| self.catalog.create_table('test_db.with_shard_test', schema, False) |
| table = self.catalog.get_table('test_db.with_shard_test') |
| |
| # Use proper table API to create writer |
| write_builder = table.new_batch_write_builder() |
| blob_writer = write_builder.new_write() |
| |
| # Test data with different types of blob content |
| test_data = pa.Table.from_pydict({ |
| 'id': [1, 2, 3, 4, 5], |
| 'type': ['text', 'json', 'binary', 'image', 'pdf'], |
| 'data': [ |
| b'This is text content', |
| b'{"key": "value", "number": 42}', |
| b'\x00\x01\x02\x03\xff\xfe\xfd', |
| b'PNG_IMAGE_DATA_PLACEHOLDER', |
| b'%PDF-1.4\nPDF_CONTENT_PLACEHOLDER' |
| ] |
| }, schema=pa_schema) |
| |
| # Write mixed data |
| total_rows = 0 |
| for batch in test_data.to_batches(): |
| blob_writer.write_arrow_batch(batch) |
| total_rows += batch.num_rows |
| |
| # Test prepare commit |
| commit_messages = blob_writer.prepare_commit() |
| # Create commit and commit the data |
| commit = write_builder.new_commit() |
| commit.commit(commit_messages) |
| blob_writer.close() |
| |
| # Read data back using table API |
| read_builder = table.new_read_builder() |
| table_scan = read_builder.new_scan().with_shard(1, 2) |
| table_read = read_builder.new_read() |
| splits = table_scan.plan().splits() |
| result = table_read.to_arrow(splits) |
| |
| # Verify the data was read back correctly |
| self.assertEqual(result.num_rows, 3, "Should have 5 rows") |
| self.assertEqual(result.num_columns, 3, "Should have 3 columns") |
| |
| def test_blob_read_row_by_row_iterator(self): |
| """Test reading blob data row by row using to_iterator().""" |
| from pypaimon import Schema |
| from pypaimon.table.row.blob import Blob |
| from pypaimon.table.row.internal_row import RowKind |
| |
| pa_schema = pa.schema([ |
| ('id', pa.int32()), |
| ('name', pa.string()), |
| ('blob_data', pa.large_binary()), |
| ]) |
| |
| schema = Schema.from_pyarrow_schema( |
| pa_schema, |
| options={ |
| 'row-tracking.enabled': 'true', |
| 'data-evolution.enabled': 'true' |
| } |
| ) |
| self.catalog.create_table('test_db.blob_iterator_test', schema, False) |
| table = self.catalog.get_table('test_db.blob_iterator_test') |
| |
| expected_data = { |
| 1: {'name': 'Alice', 'blob': b'blob_1'}, |
| 2: {'name': 'Bob', 'blob': b'blob_2_data'}, |
| 3: {'name': 'Charlie', 'blob': b'blob_3_content'}, |
| 4: {'name': 'David', 'blob': b'blob_4_large_content'}, |
| 5: {'name': 'Eve', 'blob': b'blob_5_very_large_content_data'} |
| } |
| |
| test_data = pa.Table.from_pydict({ |
| 'id': list(expected_data.keys()), |
| 'name': [expected_data[i]['name'] for i in expected_data.keys()], |
| 'blob_data': [expected_data[i]['blob'] for i in expected_data.keys()] |
| }, schema=pa_schema) |
| |
| write_builder = table.new_batch_write_builder() |
| writer = write_builder.new_write() |
| writer.write_arrow(test_data) |
| commit_messages = writer.prepare_commit() |
| commit = write_builder.new_commit() |
| commit.commit(commit_messages) |
| writer.close() |
| |
| # Verify blob files were created |
| file_names = [f.file_name for f in commit_messages[0].new_files] |
| self.assertGreater( |
| len([f for f in file_names if f.endswith('.blob')]), 0, |
| "Should have at least one blob file") |
| |
| # Read using to_iterator |
| iterator = table.new_read_builder().new_read().to_iterator( |
| table.new_read_builder().new_scan().plan().splits()) |
| |
| rows = [] |
| value = next(iterator, None) |
| while value is not None: |
| rows.append(value) |
| value = next(iterator, None) |
| |
| self.assertEqual(len(rows), 5, "Should have 5 rows") |
| |
| for row in rows: |
| row_id = row.get_field(0) |
| self.assertIn(row_id, expected_data, f"ID {row_id} should be in expected data") |
| |
| expected = expected_data[row_id] |
| self.assertEqual(row.get_field(1), expected['name'], f"Row {row_id}: name should match") |
| |
| row_blob = row.get_field(2) |
| blob_bytes = row_blob.to_data() if isinstance(row_blob, Blob) else row_blob |
| self.assertIsInstance(blob_bytes, bytes, f"Row {row_id}: blob should be bytes") |
| self.assertEqual(blob_bytes, expected['blob'], f"Row {row_id}: blob data should match") |
| self.assertEqual(len(blob_bytes), len(expected['blob']), f"Row {row_id}: blob size should match") |
| |
| self.assertIn( |
| row.get_row_kind(), |
| [RowKind.INSERT, RowKind.UPDATE_BEFORE, RowKind.UPDATE_AFTER, RowKind.DELETE], |
| f"Row {row_id}: RowKind should be valid") |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |