| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import json |
| import os |
| import shutil |
| import tempfile |
| import unittest |
| from io import StringIO |
| from unittest.mock import patch |
| |
| import pyarrow as pa |
| |
| from pypaimon import CatalogFactory, Schema |
| from pypaimon.cli.cli import main |
| |
| |
| class CliDbTest(unittest.TestCase): |
| """Integration tests for CLI database commands with real catalog operations.""" |
| |
| @classmethod |
| def setUpClass(cls): |
| """Set up test catalog and configuration.""" |
| cls.tempdir = tempfile.mkdtemp() |
| cls.warehouse = os.path.join(cls.tempdir, 'warehouse') |
| |
| # Create catalog |
| cls.catalog = CatalogFactory.create({ |
| 'warehouse': cls.warehouse |
| }) |
| |
| # Create catalog config file |
| cls.config_file = os.path.join(cls.tempdir, 'paimon.yaml') |
| with open(cls.config_file, 'w') as f: |
| f.write(f"metastore: filesystem\nwarehouse: {cls.warehouse}\n") |
| |
| @classmethod |
| def tearDownClass(cls): |
| """Clean up temporary directory.""" |
| shutil.rmtree(cls.tempdir, ignore_errors=True) |
| |
| def test_cli_db_create_basic(self): |
| """Test basic database create via CLI.""" |
| with patch('sys.argv', ['paimon', '-c', self.config_file, 'db', 'create', 'new_db']): |
| with patch('sys.stdout', new_callable=StringIO) as mock_stdout: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| output = mock_stdout.getvalue() |
| self.assertIn('created successfully', output) |
| |
| # Verify database was created |
| database = self.catalog.get_database('new_db') |
| self.assertEqual(database.name, 'new_db') |
| |
| def test_cli_db_create_ignore_if_exists(self): |
| """Test database create with ignore-if-exists flag.""" |
| # Create database first |
| self.catalog.create_database('existing_db', True) |
| |
| # Try to create again with ignore-if-exists flag |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, 'db', 'create', 'existing_db', '-i']): |
| with patch('sys.stdout', new_callable=StringIO) as mock_stdout: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| output = mock_stdout.getvalue() |
| self.assertIn('created successfully', output) |
| |
| def test_cli_db_create_already_exists_error(self): |
| """Test database create raises error when database already exists.""" |
| self.catalog.create_database('dup_db', True) |
| |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, 'db', 'create', 'dup_db']): |
| with patch('sys.stderr', new_callable=StringIO) as mock_stderr: |
| with self.assertRaises(SystemExit) as context: |
| main() |
| |
| self.assertEqual(context.exception.code, 1) |
| error_output = mock_stderr.getvalue() |
| self.assertIn('Error', error_output) |
| |
| def test_cli_db_get_basic(self): |
| """Test basic database get via CLI.""" |
| self.catalog.create_database('get_db', True) |
| |
| with patch('sys.argv', ['paimon', '-c', self.config_file, 'db', 'get', 'get_db']): |
| with patch('sys.stdout', new_callable=StringIO) as mock_stdout: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| output = mock_stdout.getvalue() |
| |
| # Verify output is valid JSON |
| result = json.loads(output) |
| self.assertEqual(result['name'], 'get_db') |
| self.assertIn('options', result) |
| |
| def test_cli_db_get_nonexistent(self): |
| """Test CLI error handling for nonexistent database.""" |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, 'db', 'get', 'nonexistent_db']): |
| with patch('sys.stderr', new_callable=StringIO) as mock_stderr: |
| with self.assertRaises(SystemExit) as context: |
| main() |
| |
| self.assertEqual(context.exception.code, 1) |
| error_output = mock_stderr.getvalue() |
| self.assertIn('Error', error_output) |
| |
| def test_cli_db_drop_basic(self): |
| """Test basic database drop via CLI.""" |
| self.catalog.create_database('drop_db', True) |
| |
| with patch('sys.argv', ['paimon', '-c', self.config_file, 'db', 'drop', 'drop_db']): |
| with patch('sys.stdout', new_callable=StringIO) as mock_stdout: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| output = mock_stdout.getvalue() |
| self.assertIn('dropped successfully', output) |
| |
| # Verify database was dropped |
| from pypaimon.catalog.catalog_exception import DatabaseNotExistException |
| with self.assertRaises(DatabaseNotExistException): |
| self.catalog.get_database('drop_db') |
| |
| def test_cli_db_drop_nonexistent_error(self): |
| """Test CLI error handling for dropping nonexistent database.""" |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, 'db', 'drop', 'nonexistent_drop_db']): |
| with patch('sys.stderr', new_callable=StringIO) as mock_stderr: |
| with self.assertRaises(SystemExit) as context: |
| main() |
| |
| self.assertEqual(context.exception.code, 1) |
| error_output = mock_stderr.getvalue() |
| self.assertIn('Error', error_output) |
| |
| def test_cli_db_drop_ignore_if_not_exists(self): |
| """Test database drop with ignore-if-not-exists flag.""" |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, 'db', 'drop', |
| 'nonexistent_ignore_db', '-i']): |
| with patch('sys.stdout', new_callable=StringIO) as mock_stdout: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| output = mock_stdout.getvalue() |
| self.assertIn('dropped successfully', output) |
| |
| def test_cli_db_drop_cascade(self): |
| """Test database drop with cascade flag.""" |
| # Create database with a table |
| self.catalog.create_database('cascade_db', True) |
| |
| pa_schema = pa.schema([ |
| ('id', pa.int32()), |
| ('name', pa.string()) |
| ]) |
| schema = Schema.from_pyarrow_schema(pa_schema) |
| self.catalog.create_table('cascade_db.test_table', schema, False) |
| |
| # Drop with cascade |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, 'db', 'drop', 'cascade_db', '--cascade']): |
| with patch('sys.stdout', new_callable=StringIO) as mock_stdout: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| output = mock_stdout.getvalue() |
| self.assertIn('dropped successfully', output) |
| |
| # Verify database was dropped |
| from pypaimon.catalog.catalog_exception import DatabaseNotExistException |
| with self.assertRaises(DatabaseNotExistException): |
| self.catalog.get_database('cascade_db') |
| |
| def test_cli_db_list_tables_basic(self): |
| """Test basic list-tables via CLI.""" |
| # Create database with tables |
| self.catalog.create_database('list_db', True) |
| |
| pa_schema = pa.schema([ |
| ('id', pa.int32()), |
| ('value', pa.string()) |
| ]) |
| schema = Schema.from_pyarrow_schema(pa_schema) |
| self.catalog.create_table('list_db.table_a', schema, True) |
| self.catalog.create_table('list_db.table_b', schema, True) |
| |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, 'db', 'list-tables', 'list_db']): |
| with patch('sys.stdout', new_callable=StringIO) as mock_stdout: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| output = mock_stdout.getvalue() |
| self.assertIn('table_a', output) |
| self.assertIn('table_b', output) |
| |
| def test_cli_db_list_tables_empty(self): |
| """Test list-tables for empty database.""" |
| self.catalog.create_database('empty_list_db', True) |
| |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, 'db', 'list-tables', 'empty_list_db']): |
| with patch('sys.stdout', new_callable=StringIO) as mock_stdout: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| output = mock_stdout.getvalue() |
| self.assertIn('No tables found', output) |
| |
| def test_cli_db_list_tables_nonexistent_db(self): |
| """Test list-tables for nonexistent database.""" |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, 'db', 'list-tables', 'nonexistent_list_db']): |
| with patch('sys.stderr', new_callable=StringIO) as mock_stderr: |
| with self.assertRaises(SystemExit) as context: |
| main() |
| |
| self.assertEqual(context.exception.code, 1) |
| error_output = mock_stderr.getvalue() |
| self.assertIn('Error', error_output) |
| |
| def test_cli_db_alter_not_supported(self): |
| """Test db alter on filesystem catalog raises not supported error.""" |
| self.catalog.create_database('alter_db', True) |
| |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, 'db', 'alter', 'alter_db', |
| '--set', '{"key1": "value1"}']): |
| with patch('sys.stderr', new_callable=StringIO) as mock_stderr: |
| with self.assertRaises(SystemExit) as context: |
| main() |
| |
| self.assertEqual(context.exception.code, 1) |
| error_output = mock_stderr.getvalue() |
| self.assertIn('not supported', error_output) |
| |
| def test_cli_db_alter_no_changes(self): |
| """Test db alter with no changes specified.""" |
| self.catalog.create_database('alter_no_change_db', True) |
| |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, 'db', 'alter', 'alter_no_change_db']): |
| with patch('sys.stderr', new_callable=StringIO) as mock_stderr: |
| with self.assertRaises(SystemExit) as context: |
| main() |
| |
| self.assertEqual(context.exception.code, 1) |
| error_output = mock_stderr.getvalue() |
| self.assertIn('No changes specified', error_output) |
| |
| def test_cli_db_alter_invalid_json(self): |
| """Test db alter with invalid JSON for --set.""" |
| self.catalog.create_database('alter_invalid_db', True) |
| |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, 'db', 'alter', 'alter_invalid_db', |
| '--set', 'not_valid_json']): |
| with patch('sys.stderr', new_callable=StringIO) as mock_stderr: |
| with self.assertRaises(SystemExit) as context: |
| main() |
| |
| self.assertEqual(context.exception.code, 1) |
| error_output = mock_stderr.getvalue() |
| self.assertIn('Invalid JSON', error_output) |
| |
| def test_cli_catalog_list_dbs_basic(self): |
| """Test basic list databases via catalog CLI.""" |
| self.catalog.create_database('list_db_a', True) |
| self.catalog.create_database('list_db_b', True) |
| |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, 'catalog', 'list-dbs']): |
| with patch('sys.stdout', new_callable=StringIO) as mock_stdout: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| output = mock_stdout.getvalue() |
| self.assertIn('list_db_a', output) |
| self.assertIn('list_db_b', output) |
| |
| def test_cli_catalog_list_dbs_excludes_nonexistent(self): |
| """Test list databases does not include non-existent databases.""" |
| self.catalog.create_database('list_exists_db', True) |
| |
| with patch('sys.argv', |
| ['paimon', '-c', self.config_file, 'catalog', 'list-dbs']): |
| with patch('sys.stdout', new_callable=StringIO) as mock_stdout: |
| try: |
| main() |
| except SystemExit: |
| pass |
| |
| output = mock_stdout.getvalue() |
| self.assertIn('list_exists_db', output) |
| self.assertNotIn('never_created_db', output) |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |