| ################################################################################ |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| ################################################################################ |
| |
| import random |
| import pandas as pd |
| import pyarrow as pa |
| |
| from pypaimon.api import Schema |
| from pypaimon.py4j.tests import PypaimonTestBase |
| |
| |
| def _check_filtered_result(read_builder, expected_df): |
| scan = read_builder.new_scan() |
| read = read_builder.new_read() |
| actual_df = read.to_pandas(scan.plan().splits()) |
| pd.testing.assert_frame_equal( |
| actual_df.reset_index(drop=True), expected_df.reset_index(drop=True)) |
| |
| |
| # TODO: Parquet has bug now. Fixed in 1.0. |
| def _random_format(): |
| return random.choice(['avro', 'orc']) |
| |
| |
| class PredicateTest(PypaimonTestBase): |
| |
| @classmethod |
| def setUpClass(cls): |
| super().setUpClass() |
| pa_schema = pa.schema([ |
| ('f0', pa.int64()), |
| ('f1', pa.string()), |
| ]) |
| cls.catalog.create_table('default.test_append', |
| Schema(pa_schema, options={'file.format': _random_format()}), |
| False) |
| cls.catalog.create_table('default.test_pk', |
| Schema(pa_schema, primary_keys=['f0'], |
| options={'bucket': '1', 'file.format': _random_format()}), |
| False) |
| |
| df = pd.DataFrame({ |
| 'f0': [1, 2, 3, 4, 5], |
| 'f1': ['abc', 'abbc', 'bc', 'd', None], |
| }) |
| |
| append_table = cls.catalog.get_table('default.test_append') |
| write_builder = append_table.new_batch_write_builder() |
| write = write_builder.new_write() |
| commit = write_builder.new_commit() |
| write.write_pandas(df) |
| commit.commit(write.prepare_commit()) |
| write.close() |
| commit.close() |
| |
| pk_table = cls.catalog.get_table('default.test_pk') |
| write_builder = pk_table.new_batch_write_builder() |
| write = write_builder.new_write() |
| commit = write_builder.new_commit() |
| write.write_pandas(df) |
| commit.commit(write.prepare_commit()) |
| write.close() |
| commit.close() |
| |
| cls.df = df |
| |
| def testWrongFieldName(self): |
| table = self.catalog.get_table('default.test_append') |
| predicate_builder = table.new_read_builder().new_predicate_builder() |
| with self.assertRaises(ValueError) as e: |
| predicate_builder.equal('f2', 'a') |
| self.assertEqual(str(e.exception), "The field f2 is not in field list ['f0', 'f1'].") |
| |
| def testAppendWithDuplicate(self): |
| pa_schema = pa.schema([ |
| ('f0', pa.int64()), |
| ('f1', pa.string()), |
| ]) |
| self.catalog.create_table('default.test_append_with_duplicate', Schema(pa_schema), False) |
| |
| df = pd.DataFrame({ |
| 'f0': [1, 1, 2, 2], |
| 'f1': ['a', 'b', 'c', 'd'], |
| }) |
| |
| table = self.catalog.get_table('default.test_append_with_duplicate') |
| write_builder = table.new_batch_write_builder() |
| write = write_builder.new_write() |
| commit = write_builder.new_commit() |
| write.write_pandas(df) |
| commit.commit(write.prepare_commit()) |
| write.close() |
| commit.close() |
| |
| predicate_builder = table.new_read_builder().new_predicate_builder() |
| |
| predicate = predicate_builder.equal('f0', 1) |
| _check_filtered_result(table.new_read_builder().with_filter(predicate), df.loc[0:1]) |
| |
| predicate = predicate_builder.equal('f0', 0) |
| read_builder = table.new_read_builder().with_filter(predicate) |
| scan = read_builder.new_scan() |
| read = read_builder.new_read() |
| actual_df = read.to_pandas(scan.plan().splits()) |
| self.assertEqual(len(actual_df), 0) |
| |
| def testAllFieldTypesWithEqual(self): |
| pa_schema = pa.schema([ |
| # int |
| ('_tinyint', pa.int8()), |
| ('_smallint', pa.int16()), |
| ('_int', pa.int32()), |
| ('_bigint', pa.int64()), |
| # float |
| ('_float16', pa.float32()), # NOTE: cannot write pa.float16() data into Paimon |
| ('_float32', pa.float32()), |
| ('_double', pa.float64()), |
| # string |
| ('_string', pa.string()), |
| # bool |
| ('_boolean', pa.bool_()) |
| ]) |
| self.catalog.create_table('default.test_all_field_types', |
| Schema(pa_schema, options={'file.format': _random_format()}), |
| False) |
| table = self.catalog.get_table('default.test_all_field_types') |
| write_builder = table.new_batch_write_builder() |
| write = write_builder.new_write() |
| commit = write_builder.new_commit() |
| |
| df = pd.DataFrame({ |
| '_tinyint': pd.Series([1, 2], dtype='int8'), |
| '_smallint': pd.Series([10, 20], dtype='int16'), |
| '_int': pd.Series([100, 200], dtype='int32'), |
| '_bigint': pd.Series([1000, 2000], dtype='int64'), |
| '_float16': pd.Series([1.0, 2.0], dtype='float16'), |
| '_float32': pd.Series([1.00, 2.00], dtype='float32'), |
| '_double': pd.Series([1.000, 2.000], dtype='double'), |
| '_string': pd.Series(['A', 'B'], dtype='object'), |
| '_boolean': [True, False] |
| }) |
| record_batch = pa.RecordBatch.from_pandas(df, schema=pa_schema) |
| # prepare for assertion |
| df['_float16'] = df['_float16'].astype('float32') |
| |
| write.write_arrow_batch(record_batch) |
| commit.commit(write.prepare_commit()) |
| write.close() |
| commit.close() |
| |
| predicate_builder = table.new_read_builder().new_predicate_builder() |
| |
| predicate = predicate_builder.equal('_tinyint', 1) |
| _check_filtered_result(table.new_read_builder().with_filter(predicate), df.loc[[0]]) |
| |
| predicate = predicate_builder.equal('_smallint', 20) |
| _check_filtered_result(table.new_read_builder().with_filter(predicate), df.loc[[1]]) |
| |
| predicate = predicate_builder.equal('_int', 100) |
| _check_filtered_result(table.new_read_builder().with_filter(predicate), df.loc[[0]]) |
| |
| predicate = predicate_builder.equal('_bigint', 2000) |
| _check_filtered_result(table.new_read_builder().with_filter(predicate), df.loc[[1]]) |
| |
| predicate = predicate_builder.equal('_float16', 1.0) |
| _check_filtered_result(table.new_read_builder().with_filter(predicate), df.loc[[0]]) |
| |
| predicate = predicate_builder.equal('_float32', 2.00) |
| _check_filtered_result(table.new_read_builder().with_filter(predicate), df.loc[[1]]) |
| |
| predicate = predicate_builder.equal('_double', 1.000) |
| _check_filtered_result(table.new_read_builder().with_filter(predicate), df.loc[[0]]) |
| |
| predicate = predicate_builder.equal('_string', 'B') |
| _check_filtered_result(table.new_read_builder().with_filter(predicate), df.loc[[1]]) |
| |
| predicate = predicate_builder.equal('_boolean', True) |
| _check_filtered_result(table.new_read_builder().with_filter(predicate), df.loc[[0]]) |
| |
| def testEqualPk(self): |
| table = self.catalog.get_table('default.test_pk') |
| predicate_builder = table.new_read_builder().new_predicate_builder() |
| predicate = predicate_builder.equal('f0', 1) |
| _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[[0]]) |
| |
| def testNotEqualAppend(self): |
| table = self.catalog.get_table('default.test_append') |
| predicate_builder = table.new_read_builder().new_predicate_builder() |
| predicate = predicate_builder.not_equal('f0', 1) |
| _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[1:4]) |
| |
| def testNotEqualPk(self): |
| table = self.catalog.get_table('default.test_pk') |
| predicate_builder = table.new_read_builder().new_predicate_builder() |
| predicate = predicate_builder.not_equal('f0', 1) |
| _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[1:4]) |
| |
| def testLessThanAppend(self): |
| table = self.catalog.get_table('default.test_append') |
| predicate_builder = table.new_read_builder().new_predicate_builder() |
| predicate = predicate_builder.less_than('f0', 3) |
| _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:1]) |
| |
| def testLessThanPk(self): |
| table = self.catalog.get_table('default.test_pk') |
| predicate_builder = table.new_read_builder().new_predicate_builder() |
| predicate = predicate_builder.less_than('f0', 3) |
| _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:1]) |
| |
| def testLessOrEqualAppend(self): |
| table = self.catalog.get_table('default.test_append') |
| predicate_builder = table.new_read_builder().new_predicate_builder() |
| predicate = predicate_builder.less_or_equal('f0', 3) |
| _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:2]) |
| |
| def testLessOrEqualPk(self): |
| table = self.catalog.get_table('default.test_pk') |
| predicate_builder = table.new_read_builder().new_predicate_builder() |
| predicate = predicate_builder.less_or_equal('f0', 3) |
| _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:2]) |
| |
| def testGreaterThanAppend(self): |
| table = self.catalog.get_table('default.test_append') |
| predicate_builder = table.new_read_builder().new_predicate_builder() |
| predicate = predicate_builder.greater_than('f0', 3) |
| _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[3:4]) |
| |
| def testGreaterThanPk(self): |
| table = self.catalog.get_table('default.test_pk') |
| predicate_builder = table.new_read_builder().new_predicate_builder() |
| predicate = predicate_builder.greater_than('f0', 3) |
| _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[3:4]) |
| |
| def testGreaterOrEqualAppend(self): |
| table = self.catalog.get_table('default.test_append') |
| predicate_builder = table.new_read_builder().new_predicate_builder() |
| predicate = predicate_builder.greater_or_equal('f0', 3) |
| _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[2:4]) |
| |
| def testGreaterOrEqualPk(self): |
| table = self.catalog.get_table('default.test_pk') |
| predicate_builder = table.new_read_builder().new_predicate_builder() |
| predicate = predicate_builder.greater_or_equal('f0', 3) |
| _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[2:4]) |
| |
| def testIsNullAppend(self): |
| table = self.catalog.get_table('default.test_append') |
| predicate_builder = table.new_read_builder().new_predicate_builder() |
| predicate = predicate_builder.is_null('f1') |
| _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[[4]]) |
| |
| def testIsNullPk(self): |
| table = self.catalog.get_table('default.test_pk') |
| predicate_builder = table.new_read_builder().new_predicate_builder() |
| predicate = predicate_builder.is_null('f1') |
| _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[[4]]) |
| |
| def testIsNotNullAppend(self): |
| table = self.catalog.get_table('default.test_append') |
| predicate_builder = table.new_read_builder().new_predicate_builder() |
| predicate = predicate_builder.is_not_null('f1') |
| _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:3]) |
| |
| def testIsNotNullPk(self): |
| table = self.catalog.get_table('default.test_pk') |
| predicate_builder = table.new_read_builder().new_predicate_builder() |
| predicate = predicate_builder.is_not_null('f1') |
| _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:3]) |
| |
| def testStartswithAppend(self): |
| table = self.catalog.get_table('default.test_append') |
| predicate_builder = table.new_read_builder().new_predicate_builder() |
| predicate = predicate_builder.startswith('f1', 'ab') |
| _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:1]) |
| |
| def testStartswithPk(self): |
| table = self.catalog.get_table('default.test_pk') |
| predicate_builder = table.new_read_builder().new_predicate_builder() |
| predicate = predicate_builder.startswith('f1', 'ab') |
| _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:1]) |
| |
| def testEndswithAppend(self): |
| table = self.catalog.get_table('default.test_append') |
| predicate_builder = table.new_read_builder().new_predicate_builder() |
| predicate = predicate_builder.endswith('f1', 'bc') |
| _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:2]) |
| |
| def testEndswithPk(self): |
| table = self.catalog.get_table('default.test_pk') |
| predicate_builder = table.new_read_builder().new_predicate_builder() |
| predicate = predicate_builder.endswith('f1', 'bc') |
| _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:2]) |
| |
| def testContainsAppend(self): |
| table = self.catalog.get_table('default.test_append') |
| predicate_builder = table.new_read_builder().new_predicate_builder() |
| predicate = predicate_builder.contains('f1', 'bb') |
| _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[[1]]) |
| |
| def testContainsPk(self): |
| table = self.catalog.get_table('default.test_pk') |
| predicate_builder = table.new_read_builder().new_predicate_builder() |
| predicate = predicate_builder.contains('f1', 'bb') |
| _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[[1]]) |
| |
| def testIsInAppend(self): |
| table = self.catalog.get_table('default.test_append') |
| predicate_builder = table.new_read_builder().new_predicate_builder() |
| predicate = predicate_builder.is_in('f0', [1, 2]) |
| _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:1]) |
| |
| def testIsInPk(self): |
| table = self.catalog.get_table('default.test_pk') |
| predicate_builder = table.new_read_builder().new_predicate_builder() |
| predicate = predicate_builder.is_in('f1', ['abc', 'd']) |
| _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[[0, 3]]) |
| |
| def testIsNotInAppend(self): |
| table = self.catalog.get_table('default.test_append') |
| predicate_builder = table.new_read_builder().new_predicate_builder() |
| predicate = predicate_builder.is_not_in('f0', [1, 2]) |
| _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[2:4]) |
| |
| def testIsNotInPk(self): |
| table = self.catalog.get_table('default.test_pk') |
| predicate_builder = table.new_read_builder().new_predicate_builder() |
| predicate = predicate_builder.is_not_in('f1', ['abc', 'd']) |
| _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[1:2]) |
| |
| def testBetweenAppend(self): |
| table = self.catalog.get_table('default.test_append') |
| predicate_builder = table.new_read_builder().new_predicate_builder() |
| predicate = predicate_builder.between('f0', 1, 3) |
| _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:2]) |
| |
| def testBetweenPk(self): |
| table = self.catalog.get_table('default.test_pk') |
| predicate_builder = table.new_read_builder().new_predicate_builder() |
| predicate = predicate_builder.between('f0', 1, 3) |
| _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:2]) |
| |
| def testAndPredicates(self): |
| table = self.catalog.get_table('default.test_append') |
| predicate_builder = table.new_read_builder().new_predicate_builder() |
| predicate1 = predicate_builder.greater_than('f0', 1) |
| predicate2 = predicate_builder.startswith('f1', 'ab') |
| predicate = predicate_builder.and_predicates([predicate1, predicate2]) |
| _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[[1]]) |
| |
| def testOrPredicates(self): |
| table = self.catalog.get_table('default.test_append') |
| predicate_builder = table.new_read_builder().new_predicate_builder() |
| predicate1 = predicate_builder.greater_than('f0', 3) |
| predicate2 = predicate_builder.less_than('f0', 2) |
| predicate = predicate_builder.or_predicates([predicate1, predicate2]) |
| _check_filtered_result(table.new_read_builder().with_filter(predicate), |
| self.df.loc[[0, 3, 4]]) |