blob: 16696f298af8fe6a7108abf94c4274a1e252df37 [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import logging
import pyarrow as pa
from pypaimon.catalog.catalog_factory import CatalogFactory
# Enable debug logging for catalog operations
logging.basicConfig(level=logging.DEBUG, format='%(levelname)s: %(name)s: %(message)s')
from pypaimon.schema.schema import Schema
from pypaimon.table.row.blob import BlobDescriptor, Blob
def oss_blob_as_descriptor():
warehouse = 'oss://<your-bucket>/<warehouse-path>'
catalog = CatalogFactory.create({
'warehouse': warehouse,
'fs.oss.endpoint': 'oss-<your-region>.aliyuncs.com',
'fs.oss.accessKeyId': '<your-ak>',
'fs.oss.accessKeySecret': '<your-sk>',
'fs.oss.region': '<your-region>'
})
pa_schema = pa.schema([
('id', pa.int32()),
('blob_data', pa.large_binary()),
])
schema = Schema.from_pyarrow_schema(
pa_schema,
options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
'blob-as-descriptor': 'true',
'target-file-size': '100MB'
}
)
catalog.create_database("test_db", True)
catalog.create_table("test_db.blob_uri_scheme_test", schema, True)
table = catalog.get_table("test_db.blob_uri_scheme_test")
# Create external blob file in OSS
external_blob_uri = f"{warehouse.rstrip('/')}/external_blob_scheme_test.bin"
blob_content = b'This is external blob data'
with table.file_io.new_output_stream(external_blob_uri) as out_stream:
out_stream.write(blob_content)
# Create BlobDescriptor with OSS scheme
blob_descriptor = BlobDescriptor(external_blob_uri, 0, len(blob_content))
descriptor_bytes = blob_descriptor.serialize()
# Write the descriptor bytes to the table
test_data = pa.Table.from_pydict({
'id': [1],
'blob_data': [descriptor_bytes]
}, schema=pa_schema)
write_builder = table.new_batch_write_builder()
writer = write_builder.new_write()
writer.write_arrow(test_data)
commit_messages = writer.prepare_commit()
commit = write_builder.new_commit()
commit.commit(commit_messages)
writer.close()
read_builder = table.new_read_builder()
table_scan = read_builder.new_scan()
table_read = read_builder.new_read()
result = table_read.to_arrow(table_scan.plan().splits())
picture_bytes = result.column('blob_data').to_pylist()[0]
new_blob_descriptor = BlobDescriptor.deserialize(picture_bytes)
print(f"Original URI: {external_blob_uri}")
print(f"Read URI: {new_blob_descriptor.uri}")
assert new_blob_descriptor.uri.startswith('oss://'), \
f"URI scheme should be preserved. Got: {new_blob_descriptor.uri}"
from pypaimon.common.uri_reader import FileUriReader
uri_reader = FileUriReader(table.file_io)
blob = Blob.from_descriptor(uri_reader, new_blob_descriptor)
blob_descriptor_from_blob = blob.to_descriptor()
print(f"Blob descriptor URI from Blob.from_descriptor: {blob_descriptor_from_blob.uri}")
read_data = blob.to_data()
assert read_data == blob_content, "Blob data should match original content"
print("✅ All assertions passed!")
if __name__ == '__main__':
oss_blob_as_descriptor()