blob: 320ee6c6e265d15175f8f820e49adbc0dcf5b41d [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import os
import sys
import unittest
import pandas as pd
import pyarrow as pa
from parameterized import parameterized
from pypaimon.catalog.catalog_factory import CatalogFactory
from pypaimon.schema.schema import Schema
from pypaimon.read.read_builder import ReadBuilder
if sys.version_info[:2] == (3, 6):
from pypaimon.tests.py36.pyarrow_compat import table_sort_by
else:
def table_sort_by(table: pa.Table, column_name: str, order: str = 'ascending') -> pa.Table:
return table.sort_by([(column_name, order)])
def get_file_format_params():
if sys.version_info[:2] == (3, 6):
return [('parquet',), ('orc',), ('avro',)]
else:
return [('parquet',), ('orc',), ('avro',), ('lance',)]
class JavaPyReadWriteTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.tempdir = os.path.abspath(".")
cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
cls.catalog = CatalogFactory.create({
'warehouse': cls.warehouse
})
cls.catalog.create_database('default', True)
@parameterized.expand(get_file_format_params())
def test_py_write_read_append_table(self, file_format):
pa_schema = pa.schema([
('id', pa.int32()),
('name', pa.string()),
('category', pa.string()),
('value', pa.float64()),
('ts', pa.timestamp('us')),
('ts_ltz', pa.timestamp('us', tz='UTC'))
])
schema = Schema.from_pyarrow_schema(
pa_schema,
partition_keys=['category'],
options={'dynamic-partition-overwrite': 'false', 'file.format': file_format}
)
table_name = f'default.mixed_test_append_tablep_{file_format}'
self.catalog.create_table(table_name, schema, False)
table = self.catalog.get_table(table_name)
initial_data = pd.DataFrame({
'id': [1, 2, 3, 4, 5, 6],
'name': ['Apple', 'Banana', 'Carrot', 'Broccoli', 'Chicken', 'Beef'],
'category': ['Fruit', 'Fruit', 'Vegetable', 'Vegetable', 'Meat', 'Meat'],
'value': [1.5, 0.8, 0.6, 1.2, 5.0, 8.0],
'ts': pd.to_datetime([1000000, 1000001, 1000002, 1000003, 1000004, 1000005], unit='ms'),
'ts_ltz': pd.to_datetime([2000000, 2000001, 2000002, 2000003, 2000004, 2000005], unit='ms', utc=True)
})
# Write initial data
write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
table_write.write_pandas(initial_data)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()
# Verify initial data
read_builder = table.new_read_builder()
table_scan = read_builder.new_scan()
table_read = read_builder.new_read()
initial_result = table_read.to_pandas(table_scan.plan().splits())
print(initial_result)
self.assertEqual(len(initial_result), 6)
# Data order may vary due to partitioning/bucketing, so compare as sets
expected_names = {'Apple', 'Banana', 'Carrot', 'Broccoli', 'Chicken', 'Beef'}
actual_names = set(initial_result['name'].tolist())
self.assertEqual(actual_names, expected_names)
@parameterized.expand(get_file_format_params())
def test_read_append_table(self, file_format):
table = self.catalog.get_table('default.mixed_test_append_tablej_' + file_format)
read_builder = table.new_read_builder()
table_scan = read_builder.new_scan()
table_read = read_builder.new_read()
res = table_read.to_pandas(table_scan.plan().splits())
print(res)
@parameterized.expand(get_file_format_params())
def test_py_write_read_pk_table(self, file_format):
# Lance format doesn't support timestamp, so exclude timestamp columns
if file_format == 'lance':
pa_schema = pa.schema([
('id', pa.int32()),
('name', pa.string()),
('category', pa.string()),
('value', pa.float64())
])
else:
pa_schema = pa.schema([
('id', pa.int32()),
('name', pa.string()),
('category', pa.string()),
('value', pa.float64()),
('ts', pa.timestamp('us')),
('ts_ltz', pa.timestamp('us', tz='UTC')),
('metadata', pa.struct([
pa.field('source', pa.string()),
pa.field('created_at', pa.int64()),
pa.field('location', pa.struct([
pa.field('city', pa.string()),
pa.field('country', pa.string())
]))
]))
])
table_name = f'default.mixed_test_pk_tablep_{file_format}'
schema = Schema.from_pyarrow_schema(
pa_schema,
partition_keys=['category'],
primary_keys=['id'],
options={
'dynamic-partition-overwrite': 'false',
'bucket': '2',
'file.format': file_format,
"orc.timestamp-ltz.legacy.type": "false"
}
)
try:
existing_table = self.catalog.get_table(table_name)
table_path = self.catalog.get_table_path(existing_table.identifier)
if self.catalog.file_io.exists(table_path):
self.catalog.file_io.delete(table_path, recursive=True)
except Exception:
pass
self.catalog.create_table(table_name, schema, False)
table = self.catalog.get_table(table_name)
# Lance format doesn't support timestamp, so exclude timestamp columns
if file_format == 'lance':
initial_data = pd.DataFrame({
'id': [1, 2, 3, 4, 5, 6],
'name': ['Apple', 'Banana', 'Carrot', 'Broccoli', 'Chicken', 'Beef'],
'category': ['Fruit', 'Fruit', 'Vegetable', 'Vegetable', 'Meat', 'Meat'],
'value': [1.5, 0.8, 0.6, 1.2, 5.0, 8.0]
})
else:
initial_data = pd.DataFrame({
'id': [1, 2, 3, 4, 5, 6],
'name': ['Apple', 'Banana', 'Carrot', 'Broccoli', 'Chicken', 'Beef'],
'category': ['Fruit', 'Fruit', 'Vegetable', 'Vegetable', 'Meat', 'Meat'],
'value': [1.5, 0.8, 0.6, 1.2, 5.0, 8.0],
'ts': pd.to_datetime([1000000, 1000001, 1000002, 1000003, 1000004, 1000005], unit='ms'),
'ts_ltz': pd.to_datetime([2000000, 2000001, 2000002, 2000003, 2000004, 2000005], unit='ms', utc=True),
'metadata': [
{'source': 'store1', 'created_at': 1001, 'location': {'city': 'Beijing', 'country': 'China'}},
{'source': 'store1', 'created_at': 1002, 'location': {'city': 'Shanghai', 'country': 'China'}},
{'source': 'store2', 'created_at': 1003, 'location': {'city': 'Tokyo', 'country': 'Japan'}},
{'source': 'store2', 'created_at': 1004, 'location': {'city': 'Seoul', 'country': 'Korea'}},
{'source': 'store3', 'created_at': 1005, 'location': {'city': 'NewYork', 'country': 'USA'}},
{'source': 'store3', 'created_at': 1006, 'location': {'city': 'London', 'country': 'UK'}}
]
})
write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
table_write.write_pandas(initial_data)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()
read_builder = table.new_read_builder()
table_scan = read_builder.new_scan()
table_read = read_builder.new_read()
initial_result = table_read.to_pandas(table_scan.plan().splits())
print(f"Format: {file_format}, Result:\n{initial_result}")
self.assertEqual(len(initial_result), 6)
# Data order may vary due to partitioning/bucketing, so compare as sets
expected_names = {'Apple', 'Banana', 'Carrot', 'Broccoli', 'Chicken', 'Beef'}
actual_names = set(initial_result['name'].tolist())
self.assertEqual(actual_names, expected_names)
@parameterized.expand(get_file_format_params())
def test_read_pk_table(self, file_format):
# Skip ORC format for Python < 3.8 due to pyarrow limitation with TIMESTAMP_INSTANT
if sys.version_info[:2] < (3, 8) and file_format == 'orc':
self.skipTest("Skipping ORC format for Python < 3.8 (pyarrow does not support TIMESTAMP_INSTANT)")
table_name = f'default.mixed_test_pk_tablej_{file_format}'
table = self.catalog.get_table(table_name)
read_builder = table.new_read_builder()
table_scan = read_builder.new_scan()
table_read = read_builder.new_read()
res = table_read.to_pandas(table_scan.plan().splits())
print(f"Format: {file_format}, Result:\n{res}")
# Verify data
self.assertEqual(len(res), 6)
if file_format != "lance":
self.assertEqual(table.fields[4].type.type, "TIMESTAMP(6)")
self.assertEqual(table.fields[5].type.type, "TIMESTAMP(6) WITH LOCAL TIME ZONE")
from pypaimon.schema.data_types import RowType
self.assertIsInstance(table.fields[6].type, RowType)
metadata_fields = table.fields[6].type.fields
self.assertEqual(len(metadata_fields), 3)
self.assertEqual(metadata_fields[0].name, 'source')
self.assertEqual(metadata_fields[1].name, 'created_at')
self.assertEqual(metadata_fields[2].name, 'location')
self.assertIsInstance(metadata_fields[2].type, RowType)
# Data order may vary due to partitioning/bucketing, so compare as sets
expected_names = {'Apple', 'Banana', 'Carrot', 'Broccoli', 'Chicken', 'Beef'}
actual_names = set(res['name'].tolist())
self.assertEqual(actual_names, expected_names)
# Verify metadata column can be read and contains nested structures
if 'metadata' in res.columns:
self.assertFalse(res['metadata'].isnull().all())
# For primary key tables, verify that _VALUE_KIND is written correctly
# by checking if we can read the raw data with system fields
# Note: Normal read filters out system fields, so we verify through Java read
# which explicitly reads KeyValue objects and checks valueKind
print(f"Format: {file_format}, Python read completed. ValueKind verification should be done in Java test.")
def test_pk_dv_read(self):
pa_schema = pa.schema([
pa.field('pt', pa.int32(), nullable=False),
pa.field('a', pa.int32(), nullable=False),
('b', pa.int64())
])
schema = Schema.from_pyarrow_schema(pa_schema,
partition_keys=['pt'],
primary_keys=['pt', 'a'],
options={'bucket': '1'})
self.catalog.create_table('default.test_pk_dv', schema, True)
table = self.catalog.get_table('default.test_pk_dv')
read_builder = table.new_read_builder()
table_read = read_builder.new_read()
splits = read_builder.new_scan().plan().splits()
actual = table_sort_by(table_read.to_arrow(splits), 'pt')
expected = pa.Table.from_pydict({
'pt': [1, 2, 2],
'a': [10, 21, 22],
'b': [1000, 20001, 202]
}, schema=pa_schema)
self.assertEqual(expected, actual)
def test_pk_dv_read_multi_batch(self):
pa_schema = pa.schema([
pa.field('pt', pa.int32(), nullable=False),
pa.field('a', pa.int32(), nullable=False),
('b', pa.int64())
])
schema = Schema.from_pyarrow_schema(pa_schema,
partition_keys=['pt'],
primary_keys=['pt', 'a'],
options={'bucket': '1'})
self.catalog.create_table('default.test_pk_dv_multi_batch', schema, True)
table = self.catalog.get_table('default.test_pk_dv_multi_batch')
read_builder = table.new_read_builder()
table_read = read_builder.new_read()
splits = read_builder.new_scan().plan().splits()
actual = table_sort_by(table_read.to_arrow(splits), 'pt')
expected = pa.Table.from_pydict({
'pt': [1] * 9999,
'a': [i * 10 for i in range(1, 10001) if i * 10 != 81930],
'b': [i * 100 for i in range(1, 10001) if i * 10 != 81930]
}, schema=pa_schema)
self.assertEqual(expected, actual)
def test_pk_dv_read_multi_batch_raw_convertable(self):
pa_schema = pa.schema([
pa.field('pt', pa.int32(), nullable=False),
pa.field('a', pa.int32(), nullable=False),
('b', pa.int64())
])
schema = Schema.from_pyarrow_schema(pa_schema,
partition_keys=['pt'],
primary_keys=['pt', 'a'],
options={'bucket': '1'})
self.catalog.create_table('default.test_pk_dv_raw_convertable', schema, True)
table = self.catalog.get_table('default.test_pk_dv_raw_convertable')
read_builder = table.new_read_builder()
table_read = read_builder.new_read()
splits = read_builder.new_scan().plan().splits()
actual = table_sort_by(table_read.to_arrow(splits), 'pt')
expected = pa.Table.from_pydict({
'pt': [1] * 9999,
'a': [i * 10 for i in range(1, 10001) if i * 10 != 81930],
'b': [i * 100 for i in range(1, 10001) if i * 10 != 81930]
}, schema=pa_schema)
self.assertEqual(expected, actual)
def test_read_btree_index_table(self):
self._test_read_btree_index_generic("test_btree_index_string", "k2", pa.string())
self._test_read_btree_index_generic("test_btree_index_int", 200, pa.int32())
self._test_read_btree_index_generic("test_btree_index_bigint", 2000, pa.int64())
self._test_read_btree_index_large()
self._test_read_btree_index_null()
def _test_read_btree_index_generic(self, table_name: str, k, k_type):
table = self.catalog.get_table('default.' + table_name)
read_builder: ReadBuilder = table.new_read_builder()
# read using index
predicate_builder = read_builder.new_predicate_builder()
predicate = predicate_builder.equal('k', k)
read_builder.with_filter(predicate)
table_read = read_builder.new_read()
splits = read_builder.new_scan().plan().splits()
actual = table_sort_by(table_read.to_arrow(splits), 'k')
expected = pa.Table.from_pydict({
'k': [k],
'v': ["v2"]
}, schema=pa.schema([
("k", k_type),
("v", pa.string())
]))
self.assertEqual(expected, actual)
def _test_read_btree_index_large(self):
table = self.catalog.get_table('default.test_btree_index_large')
read_builder: ReadBuilder = table.new_read_builder()
predicate_builder = read_builder.new_predicate_builder()
# read equal index
read_builder.with_filter(predicate_builder.equal('k', 'k2'))
table_read = read_builder.new_read()
splits = read_builder.new_scan().plan().splits()
actual = table_sort_by(table_read.to_arrow(splits), 'k')
expected = pa.Table.from_pydict({
'k': ["k2"],
'v': ["v2"]
})
self.assertEqual(expected, actual)
# read between index
read_builder.with_filter(predicate_builder.between('k', 'k990', 'k995'))
table_read = read_builder.new_read()
splits = read_builder.new_scan().plan().splits()
actual = table_sort_by(table_read.to_arrow(splits), 'k')
expected = pa.Table.from_pydict({
'k': ["k990", "k991", "k992", "k993", "k994", "k995"],
'v': ["v990", "v991", "v992", "v993", "v994", "v995"]
})
self.assertEqual(expected, actual)
# read in index
read_builder.with_filter(predicate_builder.is_in('k', ['k990', 'k995']))
table_read = read_builder.new_read()
splits = read_builder.new_scan().plan().splits()
actual = table_sort_by(table_read.to_arrow(splits), 'k')
expected = pa.Table.from_pydict({
'k': ["k990", "k995"],
'v': ["v990", "v995"]
})
self.assertEqual(expected, actual)
def _test_read_btree_index_null(self):
table = self.catalog.get_table('default.test_btree_index_null')
# read is null index
read_builder: ReadBuilder = table.new_read_builder()
predicate_builder = read_builder.new_predicate_builder()
read_builder.with_filter(predicate_builder.is_null('k'))
table_read = read_builder.new_read()
splits = read_builder.new_scan().plan().splits()
actual = table_sort_by(table_read.to_arrow(splits), 'k')
expected = pa.Table.from_pydict({
'k': [None, None],
'v': ["v3", "v5"]
}, schema=pa.schema([
("k", pa.string()),
("v", pa.string())
]))
self.assertEqual(expected, actual)
# read is not null index
read_builder: ReadBuilder = table.new_read_builder()
predicate_builder = read_builder.new_predicate_builder()
read_builder.with_filter(predicate_builder.is_not_null('k'))
table_read = read_builder.new_read()
splits = read_builder.new_scan().plan().splits()
actual = table_sort_by(table_read.to_arrow(splits), 'k')
expected = pa.Table.from_pydict({
'k': ["k1", "k2", "k4"],
'v': ["v1", "v2", "v4"]
})
self.assertEqual(expected, actual)