blob: 72bd0d8b6ff273697993179c42c591a700e7d32a [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import json
import requests
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.test_dimensions import create_uncompressed_text_dimension
class TestReusePartitions(ImpalaTestSuite):
"""Tests for catalogd reusing unchanged partition instances for DDL/DMLs"""
JSON_TABLE_OBJECT_URL = "http://localhost:25020/catalog_object?" \
"json&object_type=TABLE&object_name={0}.{1}"
@classmethod
def get_workload(self):
return 'functional-query'
@classmethod
def add_test_dimensions(cls):
super(TestReusePartitions, cls).add_test_dimensions()
# There is no reason to run these tests using all dimensions.
cls.ImpalaTestMatrix.add_dimension(
create_uncompressed_text_dimension(cls.get_workload()))
def __get_partition_id_set(self, db_name, tbl_name):
obj_url = self.JSON_TABLE_OBJECT_URL.format(db_name, tbl_name)
response = requests.get(obj_url)
assert response.status_code == requests.codes.ok
catalog_obj = json.loads(json.loads(response.text)["json_string"])
assert "table" in catalog_obj
assert "hdfs_table" in catalog_obj["table"]
tbl_obj = catalog_obj["table"]["hdfs_table"]
assert "partitions" in tbl_obj
return set(tbl_obj["partitions"].keys())
def test_reuse_partitions_nontransactional(self, unique_database):
self.__test_reuse_partitions_helper(unique_database, transactional=False)
def test_reuse_partitions_transactional(self, unique_database):
self.__test_reuse_partitions_helper(unique_database, transactional=True)
def __test_reuse_partitions_helper(self, unique_database, transactional=False):
"""Test catalogd reuses partition instances by verifying the partition ids
are unchanged"""
tbl_name = "tbl"
create_tbl_ddl =\
"create table %s.%s (id int) partitioned by (p int) stored as textfile"\
% (unique_database, tbl_name)
if transactional:
create_tbl_ddl += " tblproperties('transactional'='true'," \
" 'transactional_properties'='insert_only')"
# Creates a partitioned table with 3 partitions.
self.client.execute(create_tbl_ddl)
self.client.execute("insert into %s.%s partition (p) values (1, 1), (2, 2), (3, 3)"
% (unique_database, tbl_name))
part_ids = self.__get_partition_id_set(unique_database, tbl_name)
assert len(part_ids) == 3
# REFRESH can reuse the existing partition instances.
self.client.execute("refresh %s.%s" % (unique_database, tbl_name))
assert self.__get_partition_id_set(unique_database, tbl_name) == part_ids
# INSERT query that only touches one partition will reuse the other partitions.
self.client.execute("insert into %s.%s partition (p) values (1, 1)"
% (unique_database, tbl_name))
new_part_ids = self.__get_partition_id_set(unique_database, tbl_name)
assert len(part_ids.intersection(new_part_ids)) == 2
part_ids = new_part_ids
# INSERT query that adds a new partition will reuse the existing partitions.
self.client.execute("insert into %s.%s partition(p) values (4, 4)"
% (unique_database, tbl_name))
new_part_ids = self.__get_partition_id_set(unique_database, tbl_name)
assert len(part_ids.intersection(new_part_ids)) == 3
part_ids = new_part_ids
# ALTER TABLE not supported on transactional tables (IMPALA-8831).
if not transactional:
# ALTER statements that don't touch data will reuse the existing partitions.
self.client.execute("alter table %s.%s set tblproperties('numRows'='4')"
% (unique_database, tbl_name))
assert self.__get_partition_id_set(unique_database, tbl_name) == part_ids
self.client.execute("alter table %s.%s add column name string"
% (unique_database, tbl_name))
assert self.__get_partition_id_set(unique_database, tbl_name) == part_ids
self.client.execute("alter table %s.%s drop column name"
% (unique_database, tbl_name))
assert self.__get_partition_id_set(unique_database, tbl_name) == part_ids
# ALTER statements that modify a partition will reuse other partitions.
self.client.execute("alter table %s.%s add partition (p=5)"
% (unique_database, tbl_name))
new_part_ids = self.__get_partition_id_set(unique_database, tbl_name)
assert len(new_part_ids) == 5
assert len(part_ids.intersection(new_part_ids)) == 4
self.client.execute("alter table %s.%s drop partition (p=5)"
% (unique_database, tbl_name))
new_part_ids = self.__get_partition_id_set(unique_database, tbl_name)
assert part_ids == new_part_ids
# Updating stats will also update partition stats so no instances can be reused.
self.client.execute("compute stats %s.%s" % (unique_database, tbl_name))
new_part_ids = self.__get_partition_id_set(unique_database, tbl_name)
assert len(new_part_ids) == 4
assert len(part_ids.intersection(new_part_ids)) == 0
self.client.execute("compute incremental stats %s.%s" % (unique_database, tbl_name))
new_part_ids = self.__get_partition_id_set(unique_database, tbl_name)
assert len(new_part_ids) == 4
assert len(part_ids.intersection(new_part_ids)) == 0
part_ids = new_part_ids
# DROP STATS not supported on transactional tables (HIVE-22104).
if not transactional:
# Drop incremental stats of one partition can reuse the other 3 partitions.
self.client.execute("drop incremental stats %s.%s partition (p=1)"
% (unique_database, tbl_name))
new_part_ids = self.__get_partition_id_set(unique_database, tbl_name)
assert len(part_ids.intersection(new_part_ids)) == 3