blob: a6f635a6d4b96098128116e3d56cf50c99acb8cb [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.skip import SkipIfS3, SkipIfADLS, SkipIfIsilon, SkipIfLocal
from tests.common.test_dimensions import create_single_exec_option_dimension
from tests.util.filesystem_utils import WAREHOUSE
# Map from the test dimension file_format string to the SQL "STORED AS"
# argument.
STORED_AS_ARGS = { 'text': 'textfile', 'parquet': 'parquet', 'avro': 'avro',
'seq': 'sequencefile' }
# Tests specific to partition metadata.
# TODO: Split up the DDL tests and move some of the partition-specific tests
# here.
class TestPartitionMetadata(ImpalaTestSuite):
@classmethod
def get_workload(self):
return 'functional-query'
@classmethod
def add_test_dimensions(cls):
super(TestPartitionMetadata, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
# Run one variation of the test with each file formats that we support writing.
# The compression shouldn't affect the partition handling so restrict to the core
# compression codecs.
cls.ImpalaTestMatrix.add_constraint(lambda v:
(v.get_value('table_format').file_format in ('text', 'parquet') and
v.get_value('table_format').compression_codec == 'none') or
(v.get_value('table_format').file_format in ('seq', 'avro') and
v.get_value('table_format').compression_codec == 'snap' and
v.get_value('table_format').compression_type == 'block'))
@SkipIfLocal.hdfs_client
def test_multiple_partitions_same_location(self, vector, unique_database):
"""Regression test for IMPALA-597. Verifies Impala is able to properly read
tables that have multiple partitions pointing to the same location.
"""
TBL_NAME = "same_loc_test"
FQ_TBL_NAME = unique_database + "." + TBL_NAME
TBL_LOCATION = '%s/%s.db/%s' % (WAREHOUSE, unique_database, TBL_NAME)
file_format = vector.get_value('table_format').file_format
# Cleanup any existing data in the table directory.
self.filesystem_client.delete_file_dir(TBL_NAME, recursive=True)
# Create the table
self.client.execute(
"create table %s (i int) partitioned by(j int) stored as %s location '%s'"
% (FQ_TBL_NAME, STORED_AS_ARGS[file_format], TBL_LOCATION))
# Point multiple partitions to the same location and use partition locations that
# do not contain a key=value path.
self.filesystem_client.make_dir(TBL_NAME + '/p')
# Point both partitions to the same location.
self.client.execute("alter table %s add partition (j=1) location '%s/p'"
% (FQ_TBL_NAME, TBL_LOCATION))
self.client.execute("alter table %s add partition (j=2) location '%s/p'"
% (FQ_TBL_NAME, TBL_LOCATION))
# Allow unsupported avro and sequence file writer.
self.client.execute("set allow_unsupported_formats=true")
# Insert some data. This will only update partition j=1 (IMPALA-1480).
self.client.execute("insert into table %s partition(j=1) select 1" % FQ_TBL_NAME)
# Refresh to update file metadata of both partitions
self.client.execute("refresh %s" % FQ_TBL_NAME)
# The data will be read twice because each partition points to the same location.
data = self.execute_scalar("select sum(i), sum(j) from %s" % FQ_TBL_NAME)
if file_format == 'avro':
# Avro writer is broken and produces nulls. Only check partition column.
assert data.split('\t')[1] == '3'
else:
assert data.split('\t') == ['2', '3']
self.client.execute("insert into %s partition(j) select 1, 1" % FQ_TBL_NAME)
self.client.execute("insert into %s partition(j) select 1, 2" % FQ_TBL_NAME)
self.client.execute("refresh %s" % FQ_TBL_NAME)
data = self.execute_scalar("select sum(i), sum(j) from %s" % FQ_TBL_NAME)
if file_format == 'avro':
# Avro writer is broken and produces nulls. Only check partition column.
assert data.split('\t')[1] == '9'
else:
assert data.split('\t') == ['6', '9']
# Force all scan ranges to be on the same node. It should produce the same
# result as above. See IMPALA-5412.
self.client.execute("set num_nodes=1")
data = self.execute_scalar("select sum(i), sum(j) from %s" % FQ_TBL_NAME)
if file_format == 'avro':
# Avro writer is broken and produces nulls. Only check partition column.
assert data.split('\t')[1] == '9'
else:
assert data.split('\t') == ['6', '9']
@SkipIfS3.hive
@SkipIfADLS.hive
@SkipIfIsilon.hive
@SkipIfLocal.hive
def test_partition_metadata_compatibility(self, vector, unique_database):
"""Regression test for IMPALA-2048. For partitioned tables, test that when Impala
updates the partition metadata (e.g. by doing a compute stats), the tables are
accessible in Hive."""
FQ_TBL_HIVE = unique_database + ".part_parquet_tbl_hive"
FQ_TBL_IMP = unique_database + ".part_parquet_tbl_impala"
# First case, the table is created in HIVE.
self.run_stmt_in_hive("create table %s (a int) partitioned by (x int) "\
"stored as parquet" % FQ_TBL_HIVE)
self.run_stmt_in_hive("set hive.exec.dynamic.partition.mode=nostrict;"\
"insert into %s partition (x) values(1,1)" % FQ_TBL_HIVE)
self.run_stmt_in_hive("select * from %s" % FQ_TBL_HIVE)
# Load the table in Impala and modify its partition metadata by computing table
# statistics.
self.client.execute("invalidate metadata %s" % FQ_TBL_HIVE)
self.client.execute("compute stats %s" % FQ_TBL_HIVE)
self.client.execute("select * from %s" % FQ_TBL_HIVE)
# Make sure the table is accessible in Hive
self.run_stmt_in_hive("select * from %s" % FQ_TBL_HIVE)
# Second case, the table is created in Impala
self.client.execute("create table %s (a int) partitioned by (x int) "\
"stored as parquet" % FQ_TBL_IMP)
self.client.execute("insert into %s partition(x) values(1,1)" % FQ_TBL_IMP)
# Make sure the table is accessible in HIVE
self.run_stmt_in_hive("select * from %s" % FQ_TBL_IMP)
# Compute table statistics
self.client.execute("compute stats %s" % FQ_TBL_IMP)
self.client.execute("select * from %s" % FQ_TBL_IMP)
# Make sure the table remains accessible in HIVE
self.run_stmt_in_hive("select * from %s" % FQ_TBL_IMP)