blob: 2a6413f207e1dad8d25c059280759014963bfa7e [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
import shutil
import tempfile
import unittest
import pyarrow as pa
from pypaimon import CatalogFactory, Schema
from pypaimon.common.options.core_options import CoreOptions
from pypaimon.table.file_store_table import FileStoreTable
class FileStoreTableTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.tempdir = tempfile.mkdtemp()
cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
cls.catalog = CatalogFactory.create({
'warehouse': cls.warehouse
})
cls.catalog.create_database('default', True)
cls.pa_schema = pa.schema([
('user_id', pa.int32()),
('item_id', pa.int64()),
('behavior', pa.string()),
('dt', pa.string())
])
schema = Schema.from_pyarrow_schema(cls.pa_schema, partition_keys=['dt'],
options={CoreOptions.BUCKET.key(): "2"})
cls.catalog.create_table('default.test_copy_with_new_options', schema, False)
cls.table = cls.catalog.get_table('default.test_copy_with_new_options')
@classmethod
def tearDownClass(cls):
shutil.rmtree(cls.tempdir, ignore_errors=True)
def test_copy_with_new_options(self):
"""Test copy method with new options."""
new_options = {"new.option": "new_value", "bucket": "2"}
# Call copy method
copied_table = self.table.copy(new_options)
# Verify the copied table is a new instance
self.assertIsNot(copied_table, self.table)
self.assertIsInstance(copied_table, FileStoreTable)
# Verify the new option is added
self.assertIn("new.option", copied_table.table_schema.options)
self.assertEqual(copied_table.table_schema.options["new.option"], "new_value")
def test_copy_raises_error_when_changing_bucket(self):
"""Test copy method raises ValueError when trying to change bucket number."""
# Get current bucket value
current_bucket = self.table.options.bucket()
# Try to change bucket number to a different value
new_bucket_value = current_bucket + 1
new_options = {CoreOptions.BUCKET.key(): new_bucket_value}
# Verify ValueError is raised
with self.assertRaises(ValueError) as context:
self.table.copy(new_options)
self.assertIn("Cannot change bucket number", str(context.exception))