blob: e7ad8273d9a13a673f92eeb90547a6e162e635a4 [file]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
import shutil
import tempfile
import unittest
import pyarrow as pa
from pypaimon import CatalogFactory, Schema
class ExternalStorageBlobValidationTest(unittest.TestCase):
"""Tests for blob-external-storage-field schema validation."""
@classmethod
def setUpClass(cls):
cls.temp_dir = tempfile.mkdtemp()
cls.warehouse = os.path.join(cls.temp_dir, 'warehouse')
cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
cls.catalog.create_database('test_db', False)
@classmethod
def tearDownClass(cls):
try:
shutil.rmtree(cls.temp_dir)
except OSError:
pass
def test_validation_missing_path(self):
"""blob-external-storage-field configured without path should raise error."""
pa_schema = pa.schema([
('id', pa.int32()),
('video', pa.large_binary()),
])
schema = Schema.from_pyarrow_schema(pa_schema, options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
'blob-descriptor-field': 'video',
'blob-external-storage-field': 'video',
# Missing blob-external-storage-path
})
with self.assertRaises(ValueError) as ctx:
self.catalog.create_table('test_db.missing_path_test', schema, False)
self.assertIn('blob-external-storage-path', str(ctx.exception))
def test_validation_field_not_in_descriptor_field(self):
"""blob-external-storage-field must be a subset of blob-descriptor-field."""
external_path = os.path.join(self.temp_dir, 'external_storage')
pa_schema = pa.schema([
('id', pa.int32()),
('video', pa.large_binary()),
])
schema = Schema.from_pyarrow_schema(pa_schema, options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
# NOT setting blob-descriptor-field
'blob-external-storage-field': 'video',
'blob-external-storage-path': external_path,
})
with self.assertRaises(ValueError) as ctx:
self.catalog.create_table('test_db.not_in_descriptor_test', schema, False)
self.assertIn('blob-descriptor-field', str(ctx.exception))
def test_validation_field_not_blob_type(self):
"""blob-external-storage-field must reference BLOB type fields."""
external_path = os.path.join(self.temp_dir, 'external_storage')
pa_schema = pa.schema([
('id', pa.int32()),
('name', pa.string()),
('video', pa.large_binary()),
])
schema = Schema.from_pyarrow_schema(pa_schema, options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
'blob-descriptor-field': 'name,video',
'blob-external-storage-field': 'name',
'blob-external-storage-path': external_path,
})
with self.assertRaises(ValueError) as ctx:
self.catalog.create_table('test_db.not_blob_type_test', schema, False)
self.assertIn('must be blob fields', str(ctx.exception))
def test_validation_blob_not_null_field_passes(self):
"""BLOB NOT NULL fields should pass validation (not be rejected by str comparison)."""
from pypaimon.schema.data_types import AtomicType, DataField
external_path = os.path.join(self.temp_dir, 'external_storage')
schema = Schema(
fields=[
DataField(0, 'id', AtomicType('INT', nullable=False)),
DataField(1, 'video', AtomicType('BLOB', nullable=False)),
],
options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
'blob-descriptor-field': 'video',
'blob-external-storage-field': 'video',
'blob-external-storage-path': external_path,
},
)
# Should NOT raise - BLOB NOT NULL is still a BLOB type
self.catalog.create_table('test_db.blob_not_null_test', schema, False)
table = self.catalog.get_table('test_db.blob_not_null_test')
self.assertIsNotNone(table)
class ExternalStorageBlobWriteTest(unittest.TestCase):
"""Tests for blob external storage write functionality."""
@classmethod
def setUpClass(cls):
cls.temp_dir = tempfile.mkdtemp()
cls.warehouse = os.path.join(cls.temp_dir, 'warehouse')
cls.external_path = os.path.join(cls.temp_dir, 'external_storage')
os.makedirs(cls.external_path, exist_ok=True)
cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
cls.catalog.create_database('test_db', False)
@classmethod
def tearDownClass(cls):
try:
shutil.rmtree(cls.temp_dir)
except OSError:
pass
def _create_external_storage_table(self, table_name, extra_options=None):
"""Helper to create a table with external storage configured."""
pa_schema = pa.schema([
('id', pa.int32()),
('name', pa.string()),
('video', pa.large_binary()),
])
options = {
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
'blob-descriptor-field': 'video',
'blob-external-storage-field': 'video',
'blob-external-storage-path': self.external_path,
}
if extra_options:
options.update(extra_options)
schema = Schema.from_pyarrow_schema(pa_schema, options=options)
self.catalog.create_table(f'test_db.{table_name}', schema, False)
return self.catalog.get_table(f'test_db.{table_name}')
def test_external_storage_basic_write(self):
"""Basic write: raw blob data should be written to external storage as .blob files."""
table = self._create_external_storage_table('basic_write_test')
pa_schema = pa.schema([
('id', pa.int32()),
('name', pa.string()),
('video', pa.large_binary()),
])
test_data = pa.Table.from_pydict({
'id': [1, 2, 3],
'name': ['a', 'b', 'c'],
'video': [b'video_data_1', b'video_data_2', b'video_data_3'],
}, schema=pa_schema)
write_builder = table.new_batch_write_builder()
writer = write_builder.new_write()
writer.write_arrow(test_data)
commit_messages = writer.prepare_commit()
writer.close()
# Commit should succeed
self.assertGreater(len(commit_messages), 0)
write_builder.new_commit().commit(commit_messages)
# Verify external storage files were created
external_files = []
for root, dirs, files in os.walk(self.external_path):
for f in files:
if f.endswith('.blob'):
external_files.append(os.path.join(root, f))
self.assertGreater(len(external_files), 0, "External blob files should be created")
def test_external_storage_roundtrip(self):
"""Write raw blob data via external storage, read back should return original data."""
table = self._create_external_storage_table('roundtrip_test')
video_bytes = b'hello_world_video_content'
pa_schema = pa.schema([
('id', pa.int32()),
('name', pa.string()),
('video', pa.large_binary()),
])
test_data = pa.Table.from_pydict({
'id': [1],
'name': ['test'],
'video': [video_bytes],
}, schema=pa_schema)
write_builder = table.new_batch_write_builder()
writer = write_builder.new_write()
writer.write_arrow(test_data)
commit_messages = writer.prepare_commit()
writer.close()
write_builder.new_commit().commit(commit_messages)
# Read back - reader resolves BlobDescriptor and returns original data
read_builder = table.new_read_builder()
splits = read_builder.new_scan().plan().splits()
result = read_builder.new_read().to_arrow(splits)
self.assertEqual(result.num_rows, 1)
read_back = result.column('video')[0].as_py()
self.assertEqual(read_back, video_bytes)
def test_external_storage_multiple_fields(self):
"""Multiple external storage fields should each write to separate blob files."""
pa_schema = pa.schema([
('id', pa.int32()),
('video', pa.large_binary()),
('audio', pa.large_binary()),
])
options = {
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
'blob-descriptor-field': 'video,audio',
'blob-external-storage-field': 'video,audio',
'blob-external-storage-path': self.external_path,
}
schema = Schema.from_pyarrow_schema(pa_schema, options=options)
self.catalog.create_table('test_db.multi_field_test', schema, False)
table = self.catalog.get_table('test_db.multi_field_test')
test_data = pa.Table.from_pydict({
'id': [1, 2],
'video': [b'video1', b'video2'],
'audio': [b'audio1', b'audio2'],
}, schema=pa_schema)
write_builder = table.new_batch_write_builder()
writer = write_builder.new_write()
writer.write_arrow(test_data)
commit_messages = writer.prepare_commit()
writer.close()
write_builder.new_commit().commit(commit_messages)
# Read back and verify data round-trips correctly
read_builder = table.new_read_builder()
splits = read_builder.new_scan().plan().splits()
result = read_builder.new_read().to_arrow(splits)
self.assertEqual(result.num_rows, 2)
videos = result.column('video').to_pylist()
audios = result.column('audio').to_pylist()
self.assertEqual(set(videos), {b'video1', b'video2'})
self.assertEqual(set(audios), {b'audio1', b'audio2'})
def test_external_storage_mixed_with_normal_blob(self):
"""External storage field + normal blob field should coexist."""
pa_schema = pa.schema([
('id', pa.int32()),
('video', pa.large_binary()), # external storage
('thumbnail', pa.large_binary()), # normal blob (written to .blob files)
])
options = {
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
'blob-descriptor-field': 'video',
'blob-external-storage-field': 'video',
'blob-external-storage-path': self.external_path,
}
schema = Schema.from_pyarrow_schema(pa_schema, options=options)
self.catalog.create_table('test_db.mixed_blob_test', schema, False)
table = self.catalog.get_table('test_db.mixed_blob_test')
test_data = pa.Table.from_pydict({
'id': [1, 2],
'video': [b'big_video_data', b'another_video'],
'thumbnail': [b'thumb1', b'thumb2'],
}, schema=pa_schema)
write_builder = table.new_batch_write_builder()
writer = write_builder.new_write()
writer.write_arrow(test_data)
commit_messages = writer.prepare_commit()
writer.close()
write_builder.new_commit().commit(commit_messages)
# Read back and verify both external storage and normal blob data
read_builder = table.new_read_builder()
splits = read_builder.new_scan().plan().splits()
result = read_builder.new_read().to_arrow(splits)
self.assertEqual(result.num_rows, 2)
videos = set(result.column('video').to_pylist())
thumbnails = set(result.column('thumbnail').to_pylist())
self.assertEqual(videos, {b'big_video_data', b'another_video'})
self.assertEqual(thumbnails, {b'thumb1', b'thumb2'})
def test_external_storage_null_values(self):
"""Null blob values should remain null (not written to external storage)."""
table = self._create_external_storage_table('null_test')
pa_schema = pa.schema([
('id', pa.int32()),
('name', pa.string()),
('video', pa.large_binary()),
])
test_data = pa.Table.from_pydict({
'id': [1, 2, 3],
'name': ['a', 'b', 'c'],
'video': [b'data', None, b'more_data'],
}, schema=pa_schema)
write_builder = table.new_batch_write_builder()
writer = write_builder.new_write()
writer.write_arrow(test_data)
commit_messages = writer.prepare_commit()
writer.close()
write_builder.new_commit().commit(commit_messages)
# Read back and verify nulls are preserved
read_builder = table.new_read_builder()
splits = read_builder.new_scan().plan().splits()
result = read_builder.new_read().to_arrow(splits)
self.assertEqual(result.num_rows, 3)
# Build id → video mapping to avoid relying on row order
id_to_video = {
result.column('id')[i].as_py(): result.column('video')[i].as_py()
for i in range(result.num_rows)
}
self.assertEqual(id_to_video[1], b'data')
self.assertIsNone(id_to_video[2])
self.assertEqual(id_to_video[3], b'more_data')
def test_external_storage_with_descriptor_input(self):
"""When input is serialized BlobDescriptor bytes, the writer should read
the source data via BlobRef and re-write it to external storage."""
from pypaimon.table.row.blob import BlobDescriptor
table = self._create_external_storage_table('descriptor_input_test')
# Create a source file with known raw content
source_data = b'original_video_from_descriptor'
source_file = os.path.join(self.external_path, 'source.bin')
with open(source_file, 'wb') as f:
f.write(source_data)
# Construct a BlobDescriptor pointing to the source file
descriptor = BlobDescriptor(source_file, 0, len(source_data))
pa_schema = pa.schema([
('id', pa.int32()),
('name', pa.string()),
('video', pa.large_binary()),
])
test_data = pa.Table.from_pydict({
'id': [1],
'name': ['desc_test'],
'video': [descriptor.serialize()],
}, schema=pa_schema)
write_builder = table.new_batch_write_builder()
writer = write_builder.new_write()
writer.write_arrow(test_data)
commit_messages = writer.prepare_commit()
writer.close()
write_builder.new_commit().commit(commit_messages)
# Read back and verify the original data round-trips correctly
read_builder = table.new_read_builder()
splits = read_builder.new_scan().plan().splits()
result = read_builder.new_read().to_arrow(splits)
self.assertEqual(result.num_rows, 1)
self.assertEqual(result.column('video')[0].as_py(), source_data)
if __name__ == '__main__':
unittest.main()