| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import os |
| import shutil |
| import tempfile |
| import unittest |
| from io import StringIO |
| from unittest.mock import patch |
| |
| import pyarrow as pa |
| |
| from pypaimon import CatalogFactory, Schema |
| from pypaimon.cli.cli import main |
| |
| |
| class CliTableTest(unittest.TestCase): |
| """Integration tests for CLI with real catalog and table operations.""" |
| |
| @classmethod |
| def setUpClass(cls): |
| """Set up test catalog, database, and table with sample data.""" |
| cls.tempdir = tempfile.mkdtemp() |
| cls.warehouse = os.path.join(cls.tempdir, 'warehouse') |
| |
| # Create catalog |
| cls.catalog = CatalogFactory.create({ |
| 'warehouse': cls.warehouse |
| }) |
| cls.catalog.create_database('test_db', True) |
| |
| # Create test table with sample data |
| cls._create_test_table() |
| |
| # Create catalog config file |
| cls.config_file = os.path.join(cls.tempdir, 'paimon.yaml') |
| with open(cls.config_file, 'w') as f: |
| f.write(f"metastore: filesystem\nwarehouse: {cls.warehouse}\n") |
| |
| @classmethod |
| def tearDownClass(cls): |
| """Clean up temporary directory.""" |
| shutil.rmtree(cls.tempdir, ignore_errors=True) |
| |
| @classmethod |
| def _create_test_table(cls): |
| """Create a test table and insert sample data.""" |
| # Define schema |
| pa_schema = pa.schema([ |
| ('id', pa.int32()), |
| ('name', pa.string()), |
| ('age', pa.int32()), |
| ('city', pa.string()) |
| ]) |
| |
| schema = Schema.from_pyarrow_schema(pa_schema) |
| cls.catalog.create_table('test_db.users', schema, False) |
| |
| # Get table and write data |
| table = cls.catalog.get_table('test_db.users') |
| write_builder = table.new_batch_write_builder() |
| table_write = write_builder.new_write() |
| table_commit = write_builder.new_commit() |
| |
| # Create sample data |
| data = { |
| 'id': [1, 2, 3, 4, 5], |
| 'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'], |
| 'age': [25, 30, 35, 28, 32], |
| 'city': ['Beijing', 'Shanghai', 'Guangzhou', 'Shenzhen', 'Hangzhou'] |
| } |
| |
| table_data = pa.Table.from_pydict(data, schema=pa_schema) |
| table_write.write_arrow(table_data) |
| table_commit.commit(table_write.prepare_commit()) |
| table_write.close() |
| table_commit.close() |
| |
| def test_cli_table_read_basic(self): |
| """Test basic table read via CLI.""" |
| # Simulate CLI command: paimon -c <config> table read test_db.users |
| with patch('sys.argv', ['paimon', '-c', self.config_file, 'table', 'read', 'test_db.users']): |
| with patch('sys.stdout', new_callable=StringIO) as mock_stdout: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| output = mock_stdout.getvalue() |
| |
| # Verify output contains data |
| self.assertIn('Alice', output) |
| self.assertIn('Bob', output) |
| self.assertIn('Beijing', output) |
| self.assertIn('Shanghai', output) |
| # Verify header |
| self.assertIn('id', output.lower()) |
| self.assertIn('name', output.lower()) |
| |
| def test_cli_table_read_with_limit(self): |
| """Test table read with max results limit via CLI.""" |
| # Simulate CLI command: paimon table read test_db.users -n 2 |
| with patch('sys.argv', ['paimon', '-c', self.config_file, 'table', 'read', 'test_db.users', '-l', '2']): |
| with patch('sys.stdout', new_callable=StringIO) as mock_stdout: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| output = mock_stdout.getvalue() |
| |
| # Verify output contains limited data (only first 2 rows) |
| lines = [line for line in output.split('\n') if line.strip()] |
| # Should have header + 2 data rows |
| self.assertLessEqual(len(lines), 4) # header + 2 data rows + possible empty lines |
| |
| def test_cli_table_read_with_select(self): |
| """Test table read with column selection via CLI.""" |
| # Simulate CLI command: paimon table read test_db.users --select id,name |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, |
| 'table', 'read', 'test_db.users', '--select', 'id,name']): |
| with patch('sys.stdout', new_callable=StringIO) as mock_stdout: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| output = mock_stdout.getvalue() |
| |
| # Verify output contains selected columns |
| self.assertIn('id', output.lower()) |
| self.assertIn('name', output.lower()) |
| # Verify selected data is present |
| self.assertIn('Alice', output) |
| self.assertIn('Bob', output) |
| |
| def test_cli_table_read_with_invalid_select(self): |
| """Test table read with invalid column selection via CLI.""" |
| # Simulate CLI command: paimon table read test_db.users --select invalid_column |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, |
| 'table', 'read', 'test_db.users', '--select', 'invalid_col1,invalid_col2']): |
| with patch('sys.stderr', new_callable=StringIO) as mock_stderr: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| error_output = mock_stderr.getvalue() |
| |
| # Verify error message contains information about invalid column |
| self.assertIn("Column(s) ['invalid_col1', 'invalid_col2'] do not exist in table", error_output) |
| |
| def test_cli_with_custom_config_path(self): |
| """Test CLI with custom configuration file path.""" |
| # Create a different config file |
| custom_config = os.path.join(self.tempdir, 'custom_catalog.yaml') |
| with open(custom_config, 'w') as f: |
| f.write(f"metastore: filesystem\nwarehouse: {self.warehouse}\n") |
| |
| with patch('sys.argv', ['paimon', '-c', custom_config, 'table', 'read', 'test_db.users']): |
| with patch('sys.stdout', new_callable=StringIO) as mock_stdout: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| output = mock_stdout.getvalue() |
| self.assertIn('Alice', output) |
| |
| def test_cli_table_get_basic(self): |
| """Test basic table get via CLI.""" |
| # Simulate CLI command: paimon -c <config> table get test_db.users |
| with patch('sys.argv', ['paimon', '-c', self.config_file, 'table', 'get', 'test_db.users']): |
| with patch('sys.stdout', new_callable=StringIO) as mock_stdout: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| output = mock_stdout.getvalue() |
| |
| # Verify output is valid JSON |
| import json |
| schema_json = json.loads(output) |
| |
| # Verify schema structure |
| self.assertIn('fields', schema_json) |
| self.assertIsInstance(schema_json['fields'], list) |
| |
| # Verify field names are present |
| field_names = [field['name'] for field in schema_json['fields']] |
| self.assertIn('id', field_names) |
| self.assertIn('name', field_names) |
| self.assertIn('age', field_names) |
| self.assertIn('city', field_names) |
| |
| def test_cli_table_snapshot_basic(self): |
| """Test basic table snapshot via CLI.""" |
| # Simulate CLI command: paimon -c <config> table snapshot test_db.users |
| with patch('sys.argv', ['paimon', '-c', self.config_file, 'table', 'snapshot', 'test_db.users']): |
| with patch('sys.stdout', new_callable=StringIO) as mock_stdout: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| output = mock_stdout.getvalue() |
| |
| # Verify output is valid JSON |
| import json |
| snapshot_json = json.loads(output) |
| |
| # Verify snapshot structure |
| self.assertIn('id', snapshot_json) |
| self.assertIn('schemaId', snapshot_json) |
| self.assertIn('commitKind', snapshot_json) |
| self.assertIn('timeMillis', snapshot_json) |
| self.assertIn('totalRecordCount', snapshot_json) |
| self.assertIn('deltaRecordCount', snapshot_json) |
| |
| # Verify snapshot has valid data |
| self.assertGreater(snapshot_json['id'], 0) |
| self.assertGreater(snapshot_json['totalRecordCount'], 0) |
| |
| def test_cli_table_create_with_json_schema(self): |
| """Test table create with JSON schema file.""" |
| import json |
| |
| schema_file = os.path.join(self.tempdir, 'test_schema.json') |
| schema_data = { |
| 'fields': [ |
| {'id': 0, 'name': 'order_id', 'type': 'BIGINT'}, |
| {'id': 1, 'name': 'customer_id', 'type': 'INT'}, |
| {'id': 2, 'name': 'amount', 'type': 'DOUBLE'} |
| ], |
| 'partitionKeys': ['customer_id'], |
| 'options': {'bucket': '3'} |
| } |
| |
| with open(schema_file, 'w') as f: |
| json.dump(schema_data, f) |
| |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, 'table', 'create', 'test_db.orders', '-s', schema_file]): |
| with patch('sys.stdout', new_callable=StringIO) as mock_stdout: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| output = mock_stdout.getvalue() |
| self.assertIn('created successfully', output) |
| |
| # Verify table was created with correct schema |
| table = self.catalog.get_table('test_db.orders') |
| schema = table.table_schema |
| self.assertEqual(len(schema.fields), 3) |
| self.assertIn('customer_id', schema.partition_keys) |
| |
| def test_cli_table_import_csv(self): |
| """Test table import from CSV file via CLI.""" |
| import pandas as pd |
| |
| # Create a CSV file with test data |
| csv_file = os.path.join(self.tempdir, 'test_import.csv') |
| df = pd.DataFrame({ |
| 'id': [10, 11, 12], |
| 'name': ['Frank', 'Grace', 'Henry'], |
| 'age': [40, 45, 50], |
| 'city': ['Tokyo', 'Seoul', 'Singapore'] |
| }) |
| df.to_csv(csv_file, index=False) |
| |
| # Simulate CLI command: paimon table import test_db.users --input test_import.csv |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, 'table', 'import', 'test_db.users', '-i', csv_file]): |
| with patch('sys.stdout', new_callable=StringIO) as mock_stdout: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| output = mock_stdout.getvalue() |
| self.assertIn('Successfully imported', output) |
| self.assertIn('3 rows', output) |
| |
| # Verify data was imported by reading the table |
| with patch('sys.argv', ['paimon', '-c', self.config_file, 'table', 'read', 'test_db.users']): |
| with patch('sys.stdout', new_callable=StringIO) as mock_stdout: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| output = mock_stdout.getvalue() |
| # Check that new data is present |
| self.assertIn('Frank', output) |
| self.assertIn('Grace', output) |
| self.assertIn('Henry', output) |
| |
| def test_cli_table_import_json(self): |
| """Test table import from JSON file via CLI.""" |
| import pandas as pd |
| |
| # Create a JSON file with test data |
| json_file = os.path.join(self.tempdir, 'test_import.json') |
| df = pd.DataFrame({ |
| 'id': [20, 21], |
| 'name': ['Ivy', 'Jack'], |
| 'age': [55, 60], |
| 'city': ['Sydney', 'Melbourne'] |
| }) |
| df.to_json(json_file, orient='records') |
| |
| # Simulate CLI command: paimon table import test_db.users --input test_import.json |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, 'table', 'import', 'test_db.users', '-i', json_file]): |
| with patch('sys.stdout', new_callable=StringIO) as mock_stdout: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| output = mock_stdout.getvalue() |
| self.assertIn('Successfully imported', output) |
| self.assertIn('2 rows', output) |
| |
| # Verify data was imported by reading the table |
| with patch('sys.argv', ['paimon', '-c', self.config_file, 'table', 'read', 'test_db.users']): |
| with patch('sys.stdout', new_callable=StringIO) as mock_stdout: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| output = mock_stdout.getvalue() |
| # Check that new data is present |
| self.assertIn('Ivy', output) |
| self.assertIn('Jack', output) |
| |
| def test_cli_table_drop_basic(self): |
| """Test basic table drop via CLI.""" |
| import json |
| |
| # Create a table to drop |
| schema_file = os.path.join(self.tempdir, 'drop_schema.json') |
| schema_data = { |
| 'fields': [ |
| {'id': 0, 'name': 'id', 'type': 'INT'}, |
| {'id': 1, 'name': 'name', 'type': 'STRING'} |
| ], |
| 'primaryKeys': ['id'] |
| } |
| with open(schema_file, 'w') as f: |
| json.dump(schema_data, f) |
| |
| # Create the table first |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, 'table', 'create', 'test_db.table_to_drop', '-s', schema_file]): |
| with patch('sys.stdout', new_callable=StringIO): |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| # Verify table exists |
| table = self.catalog.get_table('test_db.table_to_drop') |
| self.assertIsNotNone(table) |
| |
| # Drop the table |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, 'table', 'drop', 'test_db.table_to_drop']): |
| with patch('sys.stdout', new_callable=StringIO) as mock_stdout: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| output = mock_stdout.getvalue() |
| self.assertIn('dropped successfully', output) |
| |
| # Verify table no longer exists |
| with self.assertRaises(Exception): |
| self.catalog.get_table('test_db.table_to_drop') |
| |
| def test_cli_table_alter_set_option(self): |
| """Test table alter set-option via CLI.""" |
| # Create test table |
| import json |
| schema_file = os.path.join(self.tempdir, 'alter_schema.json') |
| schema_data = { |
| 'fields': [ |
| {'id': 0, 'name': 'id', 'type': 'INT'}, |
| {'id': 1, 'name': 'value', 'type': 'STRING'} |
| ] |
| } |
| with open(schema_file, 'w') as f: |
| json.dump(schema_data, f) |
| |
| with patch('sys.argv', |
| ['paimon', |
| '-c', |
| self.config_file, |
| 'table', |
| 'create', |
| 'test_db.alter_set_opt', |
| '-s', |
| schema_file, |
| '-i']): |
| with patch('sys.stdout', new_callable=StringIO): |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, 'table', 'alter', 'test_db.alter_set_opt', |
| 'set-option', '-k', 'snapshot.num-retained-max', '-v', '10']): |
| with patch('sys.stdout', new_callable=StringIO) as mock_stdout: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| output = mock_stdout.getvalue() |
| self.assertIn('altered successfully', output) |
| |
| # Verify option was set |
| table = self.catalog.get_table('test_db.alter_set_opt') |
| self.assertEqual(table.table_schema.options.get('snapshot.num-retained-max'), '10') |
| |
| def test_cli_table_alter_remove_option(self): |
| """Test table alter remove-option via CLI.""" |
| # Create test table |
| import json |
| schema_file = os.path.join(self.tempdir, 'alter_schema.json') |
| schema_data = { |
| 'fields': [ |
| {'id': 0, 'name': 'id', 'type': 'INT'}, |
| {'id': 1, 'name': 'value', 'type': 'STRING'} |
| ] |
| } |
| with open(schema_file, 'w') as f: |
| json.dump(schema_data, f) |
| |
| with patch('sys.argv', |
| ['paimon', |
| '-c', |
| self.config_file, |
| 'table', |
| 'create', |
| 'test_db.alter_rm_opt', |
| '-s', |
| schema_file, |
| '-i']): |
| with patch('sys.stdout', new_callable=StringIO): |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| # First set an option |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, 'table', 'alter', 'test_db.alter_rm_opt', |
| 'set-option', '-k', 'test.option', '-v', 'test_value']): |
| with patch('sys.stdout', new_callable=StringIO): |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| # Then remove it |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, 'table', 'alter', 'test_db.alter_rm_opt', |
| 'remove-option', '-k', 'test.option']): |
| with patch('sys.stdout', new_callable=StringIO) as mock_stdout: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| output = mock_stdout.getvalue() |
| self.assertIn('altered successfully', output) |
| |
| # Verify option was removed |
| table = self.catalog.get_table('test_db.alter_rm_opt') |
| self.assertNotIn('test.option', table.table_schema.options) |
| |
| def test_cli_table_alter_add_column(self): |
| """Test table alter add-column via CLI.""" |
| # Create test table |
| import json |
| schema_file = os.path.join(self.tempdir, 'alter_schema.json') |
| schema_data = { |
| 'fields': [ |
| {'id': 0, 'name': 'id', 'type': 'INT'}, |
| {'id': 1, 'name': 'value', 'type': 'STRING'} |
| ] |
| } |
| with open(schema_file, 'w') as f: |
| json.dump(schema_data, f) |
| |
| with patch('sys.argv', |
| ['paimon', |
| '-c', |
| self.config_file, |
| 'table', |
| 'create', |
| 'test_db.alter_add_col', |
| '-s', |
| schema_file, |
| '-i']): |
| with patch('sys.stdout', new_callable=StringIO): |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, 'table', 'alter', 'test_db.alter_add_col', |
| 'add-column', '-n', 'email', '-t', 'STRING']): |
| with patch('sys.stdout', new_callable=StringIO) as mock_stdout: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| output = mock_stdout.getvalue() |
| self.assertIn('altered successfully', output) |
| |
| # Verify column was added |
| table = self.catalog.get_table('test_db.alter_add_col') |
| field_names = [f.name for f in table.table_schema.fields] |
| self.assertIn('email', field_names) |
| |
| def test_cli_table_alter_drop_column(self): |
| """Test table alter drop-column via CLI.""" |
| # Create test table |
| import json |
| schema_file = os.path.join(self.tempdir, 'alter_schema.json') |
| schema_data = { |
| 'fields': [ |
| {'id': 0, 'name': 'id', 'type': 'INT'}, |
| {'id': 1, 'name': 'value', 'type': 'STRING'} |
| ] |
| } |
| with open(schema_file, 'w') as f: |
| json.dump(schema_data, f) |
| |
| with patch('sys.argv', |
| ['paimon', |
| '-c', |
| self.config_file, |
| 'table', |
| 'create', |
| 'test_db.alter_drop_col', |
| '-s', |
| schema_file, |
| '-i']): |
| with patch('sys.stdout', new_callable=StringIO): |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| # First add a column to drop |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, 'table', 'alter', 'test_db.alter_drop_col', |
| 'add-column', '-n', 'temp_col', '-t', 'INT']): |
| with patch('sys.stdout', new_callable=StringIO): |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| # Drop the column |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, 'table', 'alter', 'test_db.alter_drop_col', |
| 'drop-column', '-n', 'temp_col']): |
| with patch('sys.stdout', new_callable=StringIO) as mock_stdout: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| output = mock_stdout.getvalue() |
| self.assertIn('altered successfully', output) |
| |
| # Verify column was dropped |
| table = self.catalog.get_table('test_db.alter_drop_col') |
| field_names = [f.name for f in table.table_schema.fields] |
| self.assertNotIn('temp_col', field_names) |
| |
| def test_cli_table_alter_rename_column(self): |
| """Test table alter rename-column via CLI.""" |
| # Create test table |
| import json |
| schema_file = os.path.join(self.tempdir, 'alter_schema.json') |
| schema_data = { |
| 'fields': [ |
| {'id': 0, 'name': 'id', 'type': 'INT'}, |
| {'id': 1, 'name': 'value', 'type': 'STRING'} |
| ] |
| } |
| with open(schema_file, 'w') as f: |
| json.dump(schema_data, f) |
| |
| with patch('sys.argv', |
| ['paimon', |
| '-c', |
| self.config_file, |
| 'table', |
| 'create', |
| 'test_db.alter_rename_col', |
| '-s', |
| schema_file, |
| '-i']): |
| with patch('sys.stdout', new_callable=StringIO): |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| # First add a column to rename |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, 'table', 'alter', 'test_db.alter_rename_col', |
| 'add-column', '-n', 'rename_me', '-t', 'STRING']): |
| with patch('sys.stdout', new_callable=StringIO): |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| # Rename the column |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, 'table', 'alter', 'test_db.alter_rename_col', |
| 'rename-column', '-n', 'rename_me', '-m', 'renamed_col']): |
| with patch('sys.stdout', new_callable=StringIO) as mock_stdout: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| output = mock_stdout.getvalue() |
| self.assertIn('altered successfully', output) |
| |
| # Verify column was renamed |
| table = self.catalog.get_table('test_db.alter_rename_col') |
| field_names = [f.name for f in table.table_schema.fields] |
| self.assertNotIn('rename_me', field_names) |
| self.assertIn('renamed_col', field_names) |
| |
| def test_cli_table_alter_column_type(self): |
| """Test table alter alter-column to change column type via CLI.""" |
| import json |
| schema_file = os.path.join(self.tempdir, 'alter_schema.json') |
| schema_data = { |
| 'fields': [ |
| {'id': 0, 'name': 'id', 'type': 'INT'}, |
| {'id': 1, 'name': 'score', 'type': 'INT'} |
| ] |
| } |
| with open(schema_file, 'w') as f: |
| json.dump(schema_data, f) |
| |
| with patch('sys.argv', |
| ['paimon', |
| '-c', |
| self.config_file, |
| 'table', |
| 'create', |
| 'test_db.alter_col_type', |
| '-s', |
| schema_file, |
| '-i']): |
| with patch('sys.stdout', new_callable=StringIO): |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, 'table', 'alter', 'test_db.alter_col_type', |
| 'alter-column', '-n', 'score', '-t', 'BIGINT']): |
| with patch('sys.stdout', new_callable=StringIO) as mock_stdout: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| output = mock_stdout.getvalue() |
| self.assertIn('altered successfully', output) |
| |
| table = self.catalog.get_table('test_db.alter_col_type') |
| score_field = [f for f in table.table_schema.fields if f.name == 'score'][0] |
| self.assertEqual(score_field.type.type, 'BIGINT') |
| |
| def test_cli_table_alter_column_comment(self): |
| """Test table alter alter-column to change column comment via CLI.""" |
| import json |
| schema_file = os.path.join(self.tempdir, 'alter_schema.json') |
| schema_data = { |
| 'fields': [ |
| {'id': 0, 'name': 'id', 'type': 'INT'}, |
| {'id': 1, 'name': 'name', 'type': 'STRING'} |
| ] |
| } |
| with open(schema_file, 'w') as f: |
| json.dump(schema_data, f) |
| |
| with patch('sys.argv', |
| ['paimon', |
| '-c', |
| self.config_file, |
| 'table', |
| 'create', |
| 'test_db.alter_col_comment', |
| '-s', |
| schema_file, |
| '-i']): |
| with patch('sys.stdout', new_callable=StringIO): |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, 'table', 'alter', 'test_db.alter_col_comment', |
| 'alter-column', '-n', 'name', '-c', 'User full name']): |
| with patch('sys.stdout', new_callable=StringIO) as mock_stdout: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| output = mock_stdout.getvalue() |
| self.assertIn('altered successfully', output) |
| |
| table = self.catalog.get_table('test_db.alter_col_comment') |
| name_field = [f for f in table.table_schema.fields if f.name == 'name'][0] |
| self.assertEqual(name_field.description, 'User full name') |
| |
| def test_cli_table_alter_column_position(self): |
| """Test table alter alter-column to change column position via CLI.""" |
| import json |
| schema_file = os.path.join(self.tempdir, 'alter_schema.json') |
| schema_data = { |
| 'fields': [ |
| {'id': 0, 'name': 'id', 'type': 'INT'}, |
| {'id': 1, 'name': 'name', 'type': 'STRING'}, |
| {'id': 2, 'name': 'age', 'type': 'INT'} |
| ] |
| } |
| with open(schema_file, 'w') as f: |
| json.dump(schema_data, f) |
| |
| with patch('sys.argv', |
| ['paimon', |
| '-c', |
| self.config_file, |
| 'table', |
| 'create', |
| 'test_db.alter_col_pos', |
| '-s', |
| schema_file, |
| '-i']): |
| with patch('sys.stdout', new_callable=StringIO): |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| # Move 'age' to first position |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, 'table', 'alter', 'test_db.alter_col_pos', |
| 'alter-column', '-n', 'age', '--first']): |
| with patch('sys.stdout', new_callable=StringIO) as mock_stdout: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| output = mock_stdout.getvalue() |
| self.assertIn('altered successfully', output) |
| |
| table = self.catalog.get_table('test_db.alter_col_pos') |
| field_names = [f.name for f in table.table_schema.fields] |
| self.assertEqual(field_names[0], 'age') |
| |
| # Move 'age' after 'name' |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, 'table', 'alter', 'test_db.alter_col_pos', |
| 'alter-column', '-n', 'age', '--after', 'name']): |
| with patch('sys.stdout', new_callable=StringIO) as mock_stdout: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| output = mock_stdout.getvalue() |
| self.assertIn('altered successfully', output) |
| |
| table = self.catalog.get_table('test_db.alter_col_pos') |
| field_names = [f.name for f in table.table_schema.fields] |
| name_idx = field_names.index('name') |
| age_idx = field_names.index('age') |
| self.assertEqual(age_idx, name_idx + 1) |
| |
| def test_cli_table_alter_update_comment(self): |
| """Test table alter update-comment via CLI.""" |
| import json |
| schema_file = os.path.join(self.tempdir, 'alter_schema.json') |
| schema_data = { |
| 'fields': [ |
| {'id': 0, 'name': 'id', 'type': 'INT'}, |
| {'id': 1, 'name': 'value', 'type': 'STRING'} |
| ], |
| 'comment': 'original comment' |
| } |
| with open(schema_file, 'w') as f: |
| json.dump(schema_data, f) |
| |
| with patch('sys.argv', |
| ['paimon', |
| '-c', |
| self.config_file, |
| 'table', |
| 'create', |
| 'test_db.alter_comment', |
| '-s', |
| schema_file, |
| '-i']): |
| with patch('sys.stdout', new_callable=StringIO): |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, 'table', 'alter', 'test_db.alter_comment', |
| 'update-comment', '-c', 'Updated table comment']): |
| with patch('sys.stdout', new_callable=StringIO) as mock_stdout: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| output = mock_stdout.getvalue() |
| self.assertIn('altered successfully', output) |
| |
| table = self.catalog.get_table('test_db.alter_comment') |
| self.assertEqual(table.table_schema.comment, 'Updated table comment') |
| |
| def test_cli_table_alter_add_column_with_position(self): |
| """Test table alter add-column with position options via CLI.""" |
| import json |
| schema_file = os.path.join(self.tempdir, 'alter_schema.json') |
| schema_data = { |
| 'fields': [ |
| {'id': 0, 'name': 'id', 'type': 'INT'}, |
| {'id': 1, 'name': 'name', 'type': 'STRING'} |
| ] |
| } |
| with open(schema_file, 'w') as f: |
| json.dump(schema_data, f) |
| |
| with patch('sys.argv', |
| ['paimon', |
| '-c', |
| self.config_file, |
| 'table', |
| 'create', |
| 'test_db.alter_add_pos', |
| '-s', |
| schema_file, |
| '-i']): |
| with patch('sys.stdout', new_callable=StringIO): |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| # Add column as first |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, 'table', 'alter', 'test_db.alter_add_pos', |
| 'add-column', '-n', 'first_col', '-t', 'INT', '--first']): |
| with patch('sys.stdout', new_callable=StringIO) as mock_stdout: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| output = mock_stdout.getvalue() |
| self.assertIn('altered successfully', output) |
| |
| table = self.catalog.get_table('test_db.alter_add_pos') |
| field_names = [f.name for f in table.table_schema.fields] |
| self.assertEqual(field_names[0], 'first_col') |
| |
| # Add column after 'id' |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, 'table', 'alter', 'test_db.alter_add_pos', |
| 'add-column', '-n', 'after_id_col', '-t', 'STRING', '--after', 'id']): |
| with patch('sys.stdout', new_callable=StringIO) as mock_stdout: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| output = mock_stdout.getvalue() |
| self.assertIn('altered successfully', output) |
| |
| table = self.catalog.get_table('test_db.alter_add_pos') |
| field_names = [f.name for f in table.table_schema.fields] |
| id_idx = field_names.index('id') |
| after_id_idx = field_names.index('after_id_col') |
| self.assertEqual(after_id_idx, id_idx + 1) |
| |
| def test_cli_table_read_with_where_equal(self): |
| """Test table read with --where equal filter via CLI.""" |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, |
| 'table', 'read', 'test_db.users', '--where', "name = 'Alice'"]): |
| with patch('sys.stdout', new_callable=StringIO) as mock_stdout: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| output = mock_stdout.getvalue() |
| self.assertIn('Alice', output) |
| self.assertNotIn('Bob', output) |
| self.assertNotIn('Charlie', output) |
| |
| def test_cli_table_read_with_where_greater_than(self): |
| """Test table read with --where greater-than filter via CLI.""" |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, |
| 'table', 'read', 'test_db.users', '-w', 'age > 30']): |
| with patch('sys.stdout', new_callable=StringIO) as mock_stdout: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| output = mock_stdout.getvalue() |
| # age > 30: Charlie(35), Eve(32) |
| self.assertIn('Charlie', output) |
| self.assertIn('Eve', output) |
| self.assertNotIn('Alice', output) |
| self.assertNotIn('Bob', output) |
| |
| def test_cli_table_read_with_where_and(self): |
| """Test table read with --where AND condition via CLI.""" |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, |
| 'table', 'read', 'test_db.users', '--where', 'age >= 28 AND age <= 32']): |
| with patch('sys.stdout', new_callable=StringIO) as mock_stdout: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| output = mock_stdout.getvalue() |
| # age >= 28 AND age <= 32: Bob(30), David(28), Eve(32) |
| self.assertIn('Bob', output) |
| self.assertIn('David', output) |
| self.assertIn('Eve', output) |
| self.assertNotIn('Alice', output) |
| self.assertNotIn('Charlie', output) |
| |
| def test_cli_table_read_with_where_in(self): |
| """Test table read with --where IN filter via CLI.""" |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, |
| 'table', 'read', 'test_db.users', |
| '--where', "city IN ('Beijing', 'Shanghai')"]): |
| with patch('sys.stdout', new_callable=StringIO) as mock_stdout: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| output = mock_stdout.getvalue() |
| self.assertIn('Alice', output) |
| self.assertIn('Bob', output) |
| self.assertNotIn('Charlie', output) |
| |
| def test_cli_table_read_with_where_and_select(self): |
| """Test table read with both --where and --select via CLI.""" |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, |
| 'table', 'read', 'test_db.users', |
| '--select', 'name,age', |
| '--where', 'age > 30']): |
| with patch('sys.stdout', new_callable=StringIO) as mock_stdout: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| output = mock_stdout.getvalue() |
| self.assertIn('Charlie', output) |
| self.assertIn('Eve', output) |
| self.assertNotIn('Alice', output) |
| |
| def test_cli_table_read_where_field_not_in_select(self): |
| """Test that where filter works even when the filtered field is not in select.""" |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, |
| 'table', 'read', 'test_db.users', |
| '--select', 'name,city', |
| '--where', 'age > 30']): |
| with patch('sys.stdout', new_callable=StringIO) as mock_stdout: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| output = mock_stdout.getvalue() |
| # age > 30: Charlie(35,Guangzhou), Eve(32,Hangzhou) |
| # Filter should work even though 'age' is not in select |
| self.assertIn('Charlie', output) |
| self.assertIn('Eve', output) |
| self.assertIn('Guangzhou', output) |
| self.assertIn('Hangzhou', output) |
| # Excluded rows should not appear |
| self.assertNotIn('Alice', output) |
| self.assertNotIn('Bob', output) |
| self.assertNotIn('David', output) |
| # The 'age' column should NOT appear in output (it was only needed for filtering) |
| self.assertNotIn(' 35', output) |
| self.assertNotIn(' 32', output) |
| self.assertNotIn(' 25', output) |
| |
| def test_cli_table_read_with_where_and_limit(self): |
| """Test that where + limit returns correct filtered results without limit push-down. |
| |
| Writes data in two batches to produce multiple splits, so that limit |
| push-down would actually take effect and potentially miss matching rows |
| in later splits. |
| """ |
| |
| # Create a dedicated table for this test with two batches of data |
| pa_schema = pa.schema([ |
| ('id', pa.int32()), |
| ('val', pa.string()), |
| ('score', pa.int32()), |
| ]) |
| # Important: multiple splits are required for the limit to take effect |
| schema = Schema.from_pyarrow_schema(pa_schema, options={'source.split.target-size': '1b'}) |
| self.catalog.create_table('test_db.limit_test', schema, True) |
| table = self.catalog.get_table('test_db.limit_test') |
| |
| def write_batch(): |
| write_builder = table.new_batch_write_builder() |
| table_write = write_builder.new_write() |
| table_commit = write_builder.new_commit() |
| batch = pa.Table.from_pydict({ |
| 'id': [1, 2, 3], |
| 'val': ['a', 'b', 'c'], |
| 'score': [10, 20, 30], |
| }, schema=pa_schema) |
| table_write.write_arrow(batch) |
| table_commit.commit(table_write.prepare_commit()) |
| table_write.close() |
| table_commit.close() |
| |
| write_batch() |
| write_batch() |
| write_batch() |
| |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, |
| 'table', 'read', 'test_db.limit_test', |
| '--where', 'score = 20', |
| '--limit', '2']): |
| with patch('sys.stdout', new_callable=StringIO) as mock_stdout: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| output = mock_stdout.getvalue() |
| lines = [line for line in output.strip().split('\n') if line.strip()] |
| self.assertEqual(len(lines), 3) |
| self.assertNotIn(' a ', output) |
| self.assertNotIn(' c ', output) |
| |
| def test_cli_table_read_with_invalid_where(self): |
| """Test table read with invalid --where clause via CLI.""" |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, |
| 'table', 'read', 'test_db.users', |
| '--where', 'age INVALID 30']): |
| with patch('sys.stderr', new_callable=StringIO) as mock_stderr: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| error_output = mock_stderr.getvalue() |
| self.assertIn('Error', error_output) |
| |
| def test_cli_table_rename_basic(self): |
| """Test basic table rename via CLI.""" |
| import json |
| |
| # Create a table to rename |
| schema_file = os.path.join(self.tempdir, 'rename_schema.json') |
| schema_data = { |
| 'fields': [ |
| {'id': 0, 'name': 'id', 'type': 'INT'}, |
| {'id': 1, 'name': 'value', 'type': 'STRING'} |
| ] |
| } |
| with open(schema_file, 'w') as f: |
| json.dump(schema_data, f) |
| |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, 'table', 'create', |
| 'test_db.rename_source', '-s', schema_file, '-i']): |
| with patch('sys.stdout', new_callable=StringIO): |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| # Verify source table exists |
| table = self.catalog.get_table('test_db.rename_source') |
| self.assertIsNotNone(table) |
| |
| # Rename the table |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, 'table', 'rename', |
| 'test_db.rename_source', 'test_db.rename_target']): |
| with patch('sys.stdout', new_callable=StringIO) as mock_stdout: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| output = mock_stdout.getvalue() |
| self.assertIn('renamed', output) |
| self.assertIn('successfully', output) |
| |
| # Verify target table exists |
| table = self.catalog.get_table('test_db.rename_target') |
| self.assertIsNotNone(table) |
| |
| # Verify source table no longer exists |
| with self.assertRaises(Exception): |
| self.catalog.get_table('test_db.rename_source') |
| |
| @classmethod |
| def _create_partitioned_table(cls): |
| """Create a partitioned test table and insert sample data.""" |
| pa_schema = pa.schema([ |
| ('dt', pa.string()), |
| ('region', pa.string()), |
| ('id', pa.int32()), |
| ('value', pa.string()) |
| ]) |
| |
| schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=['dt', 'region']) |
| cls.catalog.create_table('test_db.partitioned', schema, True) |
| |
| table = cls.catalog.get_table('test_db.partitioned') |
| |
| # Write data for partition dt=2024-01-01, region=us |
| write_builder = table.new_batch_write_builder() |
| table_write = write_builder.new_write() |
| table_commit = write_builder.new_commit() |
| data = pa.Table.from_pydict({ |
| 'dt': ['2024-01-01', '2024-01-01'], |
| 'region': ['us', 'us'], |
| 'id': [1, 2], |
| 'value': ['a', 'b'], |
| }, schema=pa_schema) |
| table_write.write_arrow(data) |
| table_commit.commit(table_write.prepare_commit()) |
| table_write.close() |
| table_commit.close() |
| |
| # Write data for partition dt=2024-01-02, region=eu |
| write_builder = table.new_batch_write_builder() |
| table_write = write_builder.new_write() |
| table_commit = write_builder.new_commit() |
| data = pa.Table.from_pydict({ |
| 'dt': ['2024-01-02', '2024-01-02', '2024-01-02'], |
| 'region': ['eu', 'eu', 'eu'], |
| 'id': [3, 4, 5], |
| 'value': ['c', 'd', 'e'], |
| }, schema=pa_schema) |
| table_write.write_arrow(data) |
| table_commit.commit(table_write.prepare_commit()) |
| table_write.close() |
| table_commit.close() |
| |
| def test_cli_table_list_partitions(self): |
| """Test list-partitions command with real partitioned table.""" |
| self._create_partitioned_table() |
| |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, |
| 'table', 'list-partitions', 'test_db.partitioned']): |
| with patch('sys.stdout', new_callable=StringIO) as mock_stdout: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| output = mock_stdout.getvalue() |
| # Verify header columns |
| self.assertIn('Partition', output) |
| self.assertIn('RecordCount', output) |
| self.assertIn('FileSizeInBytes', output) |
| self.assertIn('FileCount', output) |
| # Verify partition specs |
| self.assertIn('dt=2024-01-01,region=us', output) |
| self.assertIn('dt=2024-01-02,region=eu', output) |
| |
| def test_cli_table_list_partitions_with_pattern(self): |
| """Test list-partitions command with --pattern filter.""" |
| self._create_partitioned_table() |
| |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, |
| 'table', 'list-partitions', 'test_db.partitioned', |
| '--pattern', 'dt=2024-01-01*']): |
| with patch('sys.stdout', new_callable=StringIO) as mock_stdout: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| output = mock_stdout.getvalue() |
| self.assertIn('dt=2024-01-01,region=us', output) |
| self.assertNotIn('dt=2024-01-02', output) |
| |
| def test_cli_table_read_format_json(self): |
| """Test table read with --format json output.""" |
| import json |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, |
| 'table', 'read', 'test_db.users', '--format', 'json']): |
| with patch('sys.stdout', new_callable=StringIO) as mock_stdout: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| output = mock_stdout.getvalue() |
| rows = json.loads(output) |
| self.assertIsInstance(rows, list) |
| self.assertGreaterEqual(len(rows), 5) |
| names = {r['name'] for r in rows} |
| self.assertIn('Alice', names) |
| self.assertIn('Bob', names) |
| for r in rows: |
| self.assertIn('id', r) |
| self.assertIn('name', r) |
| self.assertIn('age', r) |
| self.assertIn('city', r) |
| |
| def test_cli_table_read_format_json_with_select(self): |
| """Test table read with --format json and --select.""" |
| import json |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, |
| 'table', 'read', 'test_db.users', |
| '--select', 'name,age', '--format', 'json']): |
| with patch('sys.stdout', new_callable=StringIO) as mock_stdout: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| output = mock_stdout.getvalue() |
| rows = json.loads(output) |
| self.assertIsInstance(rows, list) |
| for r in rows: |
| self.assertIn('name', r) |
| self.assertIn('age', r) |
| self.assertNotIn('id', r) |
| self.assertNotIn('city', r) |
| |
| def test_cli_table_list_partitions_format_json(self): |
| """Test list-partitions with --format json output.""" |
| import json |
| self._create_partitioned_table() |
| |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, |
| 'table', 'list-partitions', 'test_db.partitioned', |
| '--format', 'json']): |
| with patch('sys.stdout', new_callable=StringIO) as mock_stdout: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| output = mock_stdout.getvalue() |
| rows = json.loads(output) |
| self.assertIsInstance(rows, list) |
| self.assertEqual(len(rows), 2) |
| for r in rows: |
| self.assertIn('Partition', r) |
| self.assertIn('RecordCount', r) |
| self.assertIn('FileSizeInBytes', r) |
| self.assertIn('FileCount', r) |
| partitions = {r['Partition'] for r in rows} |
| self.assertIn('dt=2024-01-01,region=us', partitions) |
| self.assertIn('dt=2024-01-02,region=eu', partitions) |
| |
| def test_cli_table_list_partitions_empty(self): |
| """Test list-partitions on non-partitioned table (no snapshot = empty).""" |
| # Create a table with no data |
| pa_schema = pa.schema([ |
| ('id', pa.int32()), |
| ('val', pa.string()) |
| ]) |
| schema = Schema.from_pyarrow_schema(pa_schema) |
| self.catalog.create_table('test_db.empty_part', schema, True) |
| |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, |
| 'table', 'list-partitions', 'test_db.empty_part']): |
| with patch('sys.stdout', new_callable=StringIO) as mock_stdout: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| output = mock_stdout.getvalue() |
| self.assertIn('No partitions found', output) |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |