| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import json |
| import requests |
| from tests.common.impala_test_suite import ImpalaTestSuite |
| |
| |
| class TestReusePartitions(ImpalaTestSuite): |
| """Tests for catalogd reusing unchanged partition instances for DDL/DMLs""" |
| |
| def test_reuse_partitions_nontransactional(self, unique_database): |
| self.__test_reuse_partitions_helper(unique_database, transactional=False) |
| |
| def test_reuse_partitions_transactional(self, unique_database): |
| self.__test_reuse_partitions_helper(unique_database, transactional=True) |
| |
| def __test_reuse_partitions_helper(self, unique_database, transactional=False): |
| """Test catalogd reuses partition instances by verifying the partition ids |
| are unchanged""" |
| tbl_name = "tbl" |
| create_tbl_ddl =\ |
| "create table %s.%s (id int) partitioned by (p int) stored as textfile"\ |
| % (unique_database, tbl_name) |
| if transactional: |
| create_tbl_ddl += " tblproperties('transactional'='true'," \ |
| " 'transactional_properties'='insert_only')" |
| # Creates a partitioned table with 3 partitions. |
| self.client.execute(create_tbl_ddl) |
| self.client.execute("insert into %s.%s partition (p) values (1, 1), (2, 2), (3, 3)" |
| % (unique_database, tbl_name)) |
| part_ids = self.get_partition_id_set(unique_database, tbl_name) |
| assert len(part_ids) == 3 |
| |
| # REFRESH can reuse the existing partition instances. |
| self.client.execute("refresh %s.%s" % (unique_database, tbl_name)) |
| assert self.get_partition_id_set(unique_database, tbl_name) == part_ids |
| # INSERT query that only touches one partition will reuse the other partitions. |
| self.client.execute("insert into %s.%s partition (p) values (1, 1)" |
| % (unique_database, tbl_name)) |
| new_part_ids = self.get_partition_id_set(unique_database, tbl_name) |
| assert len(part_ids.intersection(new_part_ids)) == 2 |
| part_ids = new_part_ids |
| # INSERT query that adds a new partition will reuse the existing partitions. |
| self.client.execute("insert into %s.%s partition(p) values (4, 4)" |
| % (unique_database, tbl_name)) |
| new_part_ids = self.get_partition_id_set(unique_database, tbl_name) |
| assert len(part_ids.intersection(new_part_ids)) == 3 |
| part_ids = new_part_ids |
| |
| # ALTER TABLE not supported on transactional tables (IMPALA-8831). |
| if not transactional: |
| # ALTER statements that don't touch data will reuse the existing partitions. |
| self.client.execute("alter table %s.%s set tblproperties('numRows'='4')" |
| % (unique_database, tbl_name)) |
| assert self.get_partition_id_set(unique_database, tbl_name) == part_ids |
| self.client.execute("alter table %s.%s add column name string" |
| % (unique_database, tbl_name)) |
| assert self.get_partition_id_set(unique_database, tbl_name) == part_ids |
| self.client.execute("alter table %s.%s drop column name" |
| % (unique_database, tbl_name)) |
| assert self.get_partition_id_set(unique_database, tbl_name) == part_ids |
| # ALTER statements that modify a partition will reuse other partitions. |
| self.client.execute("alter table %s.%s add partition (p=5)" |
| % (unique_database, tbl_name)) |
| new_part_ids = self.get_partition_id_set(unique_database, tbl_name) |
| assert len(new_part_ids) == 5 |
| assert len(part_ids.intersection(new_part_ids)) == 4 |
| self.client.execute("alter table %s.%s drop partition (p=5)" |
| % (unique_database, tbl_name)) |
| new_part_ids = self.get_partition_id_set(unique_database, tbl_name) |
| assert part_ids == new_part_ids |
| |
| # Updating stats will also update partition stats so no instances can be reused. |
| self.client.execute("compute stats %s.%s" % (unique_database, tbl_name)) |
| new_part_ids = self.get_partition_id_set(unique_database, tbl_name) |
| assert len(new_part_ids) == 4 |
| assert len(part_ids.intersection(new_part_ids)) == 0 |
| self.client.execute("compute incremental stats %s.%s" % (unique_database, tbl_name)) |
| new_part_ids = self.get_partition_id_set(unique_database, tbl_name) |
| assert len(new_part_ids) == 4 |
| assert len(part_ids.intersection(new_part_ids)) == 0 |
| part_ids = new_part_ids |
| # DROP STATS not supported on transactional tables (HIVE-22104). |
| if not transactional: |
| # Drop incremental stats of one partition can reuse the other 3 partitions. |
| self.client.execute("drop incremental stats %s.%s partition (p=1)" |
| % (unique_database, tbl_name)) |
| new_part_ids = self.get_partition_id_set(unique_database, tbl_name) |
| assert len(part_ids.intersection(new_part_ids)) == 3 |