blob: d1e27194b4e26fe97fb5eeb3bbbe88310427f2fd [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
from copy import deepcopy
import pytest
from subprocess import check_call
from pytest import skip
from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.skip import (
SkipIfIsilon,
SkipIfS3,
SkipIfABFS,
SkipIfADLS,
SkipIfEC,
SkipIfLocal,
SkipIfNotHdfsMinicluster
)
from tests.common.test_vector import ImpalaTestDimension
from tests.util.filesystem_utils import WAREHOUSE, get_fs_path, IS_HDFS
class TestNestedTypes(ImpalaTestSuite):
@classmethod
def get_workload(self):
return 'functional-query'
@classmethod
def add_test_dimensions(cls):
super(TestNestedTypes, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_constraint(lambda v:
v.get_value('table_format').file_format in ['parquet', 'orc'])
def test_scanner_basic(self, vector):
"""Queries that do not materialize arrays."""
self.run_test_case('QueryTest/nested-types-scanner-basic', vector)
def test_scanner_array_materialization(self, vector):
"""Queries that materialize arrays."""
self.run_test_case('QueryTest/nested-types-scanner-array-materialization', vector)
def test_scanner_multiple_materialization(self, vector):
"""Queries that materialize the same array multiple times."""
self.run_test_case('QueryTest/nested-types-scanner-multiple-materialization', vector)
def test_scanner_position(self, vector):
"""Queries that materialize the artifical position element."""
self.run_test_case('QueryTest/nested-types-scanner-position', vector)
def test_scanner_map(self, vector):
"""Queries that materialize maps. (Maps looks like arrays of key/value structs, so
most map functionality is already tested by the array tests.)"""
self.run_test_case('QueryTest/nested-types-scanner-maps', vector)
def test_runtime(self, vector):
"""Queries that send collections through the execution runtime."""
self.run_test_case('QueryTest/nested-types-runtime', vector)
def test_subplan(self, vector):
"""Test subplans with various exec nodes inside it."""
db_suffix = vector.get_value('table_format').db_suffix()
self.run_test_case('QueryTest/nested-types-subplan', vector,
use_db='tpch_nested' + db_suffix)
def test_subplan_single_node(self, vector):
"""Test subplans with various exec nodes inside it and num_nodes=1."""
new_vector = deepcopy(vector)
new_vector.get_value('exec_option')['num_nodes'] = 1
self.run_test_case('QueryTest/nested-types-subplan-single-node', new_vector)
def test_with_clause(self, vector):
"""Queries using nested types and with WITH clause."""
db_suffix = vector.get_value('table_format').db_suffix()
self.run_test_case('QueryTest/nested-types-with-clause', vector,
use_db='tpch_nested' + db_suffix)
def test_tpch(self, vector):
"""Queries over the larger nested TPCH dataset."""
db_suffix = vector.get_value('table_format').db_suffix()
self.run_test_case('QueryTest/nested-types-tpch', vector,
use_db='tpch_nested' + db_suffix)
def test_tpch_limit(self, vector):
"""Queries over the larger nested TPCH dataset with limits in their subplan."""
vector.get_value('exec_option')['batch_size'] = 10
db_suffix = vector.get_value('table_format').db_suffix()
self.run_test_case('QueryTest/nested-types-tpch-limit', vector,
use_db='tpch_nested' + db_suffix)
@SkipIfNotHdfsMinicluster.tuned_for_minicluster
def test_tpch_mem_limit(self, vector):
"""Queries over the larger nested TPCH dataset with memory limits tuned for
a 3-node HDFS minicluster."""
db_suffix = vector.get_value('table_format').db_suffix()
self.run_test_case('QueryTest/nested-types-tpch-mem-limit', vector,
use_db='tpch_nested' + db_suffix)
@SkipIfNotHdfsMinicluster.tuned_for_minicluster
def test_tpch_mem_limit_single_node(self, vector):
"""Queries over the larger nested TPCH dataset with memory limits tuned for
a 3-node HDFS minicluster with num_nodes=1."""
new_vector = deepcopy(vector)
new_vector.get_value('exec_option')['num_nodes'] = 1
if vector.get_value('table_format').file_format == 'orc':
# IMPALA-8336: lower memory limit for ORC
new_vector.get_value('exec_option')['mem_limit'] = '20M'
else:
new_vector.get_value('exec_option')['mem_limit'] = '28M'
db_suffix = vector.get_value('table_format').db_suffix()
self.run_test_case('QueryTest/nested-types-tpch-mem-limit-single-node',
new_vector, use_db='tpch_nested' + db_suffix)
@SkipIfEC.fix_later
def test_parquet_stats(self, vector):
"""Queries that test evaluation of Parquet row group statistics."""
if vector.get_value('table_format').file_format == 'orc':
pytest.skip('Predicate push down on ORC stripe statistics is not supported' +
'(IMPALA-6505)')
self.run_test_case('QueryTest/nested-types-parquet-stats', vector)
@SkipIfIsilon.hive
@SkipIfS3.hive
@SkipIfABFS.hive
@SkipIfADLS.hive
@SkipIfLocal.hive
def test_upper_case_field_name(self, unique_database):
"""IMPALA-5994: Tests that a Hive-created table with a struct field name with upper
case characters can be selected."""
table_name = "%s.upper_case_test" % unique_database
create_table = "CREATE TABLE %s (s struct<upperCaseName:int>) STORED AS PARQUET" % \
table_name
self.run_stmt_in_hive(create_table)
self.client.execute("invalidate metadata %s" % table_name)
self.client.execute("select s.UppercasenamE from %s" % table_name)
self.client.execute("select s.* from %s" % table_name)
def test_partitioned_table(self, vector, unique_database):
"""IMPALA-6370: Test that a partitioned table with nested types can be scanned."""
table = "complextypes_partitioned"
db_table = "{0}.{1}".format(unique_database, table)
table_format_info = vector.get_value('table_format') # type: TableFormatInfo
file_format = table_format_info.file_format
db_suffix = table_format_info.db_suffix()
self.client.execute("""
CREATE EXTERNAL TABLE {0} (
id BIGINT,
int_array ARRAY<INT>,
int_array_array ARRAY<ARRAY<INT>>,
int_map MAP<STRING,INT>,
int_map_array ARRAY<MAP<STRING,INT>>,
nested_struct STRUCT<
a:INT,
b:ARRAY<INT>,
c:STRUCT<d:ARRAY<ARRAY<STRUCT<e:INT,f:STRING>>>>,
g:MAP<STRING,STRUCT<h:STRUCT<i:ARRAY<DOUBLE>>>>>
)
PARTITIONED BY (
part int
)
STORED AS {1}""".format(db_table, file_format))
# Add multiple partitions pointing to the complextypes_tbl data.
for partition in [1, 2]:
self.client.execute("ALTER TABLE {0} ADD PARTITION(part={1}) LOCATION '{2}'".format(
db_table, partition,
self._get_table_location("functional%s.complextypestbl" % db_suffix, vector)))
self.run_test_case('QueryTest/nested-types-basic-partitioned', vector,
unique_database)
class TestParquetArrayEncodings(ImpalaTestSuite):
TESTFILE_DIR = os.path.join(os.environ['IMPALA_HOME'],
"testdata/parquet_nested_types_encodings")
ARRAY_RESOLUTION_POLICIES = ["three_level", "two_level", "two_level_then_three_level"]
@classmethod
def get_workload(self):
return 'functional-query'
@classmethod
def add_test_dimensions(cls):
super(TestParquetArrayEncodings, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension(
'parquet_array_resolution', *TestParquetArrayEncodings.ARRAY_RESOLUTION_POLICIES))
cls.ImpalaTestMatrix.add_constraint(lambda v:
v.get_value('table_format').file_format == 'parquet')
def __init_arr_res(self, vector):
arr_res = vector.get_value('parquet_array_resolution')
qopts = vector.get_value('exec_option')
qopts['parquet_array_resolution'] = arr_res
return (arr_res, qopts)
# $ parquet-tools schema SingleFieldGroupInList.parquet
# message SingleFieldGroupInList {
# optional group single_element_groups (LIST) {
# repeated group single_element_group {
# required int64 count;
# }
# }
# }
#
# $ parquet-tools cat SingleFieldGroupInList.parquet
# single_element_groups:
# .single_element_group:
# ..count = 1234
# .single_element_group:
# ..count = 2345
def test_single_field_group_in_list(self, vector, unique_database):
self.__test_single_field_group_in_list(unique_database, "SingleFieldGroupInList",
"SingleFieldGroupInList.parquet", vector)
# $ parquet-tools schema AvroSingleFieldGroupInList.parquet
# message AvroSingleFieldGroupInList {
# optional group single_element_groups (LIST) {
# repeated group array {
# required int64 count;
# }
# }
# }
#
# $ parquet-tools cat AvroSingleFieldGroupInList.parquet
# single_element_groups:
# .array:
# ..count = 1234
# .array:
# ..count = 2345
def test_avro_single_field_group_in_list(self, vector, unique_database):
self.__test_single_field_group_in_list(unique_database, "AvroSingleFieldGroupInList",
"AvroSingleFieldGroupInList.parquet", vector)
# $ parquet-tools schema ThriftSingleFieldGroupInList.parquet
# message ThriftSingleFieldGroupInList {
# optional group single_element_groups (LIST) {
# repeated group single_element_groups_tuple {
# required int64 count;
# }
# }
# }
#
# $ parquet-tools cat ThriftSingleFieldGroupInList.parquet
# single_element_groups:
# .single_element_groups_tuple:
# ..count = 1234
# .single_element_groups_tuple:
# ..count = 2345
def test_thrift_single_field_group_in_list(self, vector, unique_database):
self.__test_single_field_group_in_list(unique_database,
"ThriftSingleFieldGroupInList", "ThriftSingleFieldGroupInList.parquet", vector)
def __test_single_field_group_in_list(self, db, tbl, parq_file, vector):
arr_res, qopts = self.__init_arr_res(vector)
full_name = "%s.%s" % (db, tbl)
if arr_res == "two_level" or arr_res == "two_level_then_three_level":
self._create_test_table(db, tbl, parq_file, "col1 array<struct<f1: bigint>>")
result = self.execute_query("select item.f1 from %s.col1" % full_name, qopts)
assert result.data == ['1234', '2345']
result = self.execute_query("select item.f1 from %s t, t.col1" % full_name, qopts)
assert result.data == ['1234', '2345']
if arr_res == "three_level":
self._create_test_table(db, tbl, parq_file, "col1 array<bigint>")
result = self.execute_query("select item from %s.col1" % full_name, qopts)
assert result.data == ['1234', '2345']
result = self.execute_query("select item from %s t, t.col1" % full_name, qopts)
assert result.data == ['1234', '2345']
result = self.execute_query(
"select cnt from %s t, (select count(*) cnt from t.col1) v" % full_name, qopts)
assert result.data == ['2']
# $ parquet-tools schema AvroPrimitiveInList.parquet
# message AvroPrimitiveInList {
# required group list_of_ints (LIST) {
# repeated int32 array;
# }
# }
#
# $ parquet-tools cat AvroPrimitiveInList.parquet
# list_of_ints:
# .array = 34
# .array = 35
# .array = 36
@SkipIfS3.eventually_consistent
def test_avro_primitive_in_list(self, vector, unique_database):
self.__test_primitive_in_list(unique_database, "AvroPrimitiveInList",
"AvroPrimitiveInList.parquet", vector)
# $ parquet-tools schema ThriftPrimitiveInList.parquet
# message ThriftPrimitiveInList {
# required group list_of_ints (LIST) {
# repeated int32 list_of_ints_tuple;
# }
# }
#
# $ parquet-tools cat ThriftPrimitiveInList.parquet
# list_of_ints:
# .list_of_ints_tuple = 34
# .list_of_ints_tuple = 35
# .list_of_ints_tuple = 36
def test_thrift_primitive_in_list(self, vector, unique_database):
self.__test_primitive_in_list(unique_database, "ThriftPrimitiveInList",
"ThriftPrimitiveInList.parquet", vector)
def __test_primitive_in_list(self, db, tbl, parq_file, vector):
arr_res, qopts = self.__init_arr_res(vector)
full_name = "%s.%s" % (db, tbl)
self._create_test_table(db, tbl, parq_file, "col1 array<int>")
if arr_res == "two_level" or arr_res == "two_level_then_three_level":
result = self.execute_query("select item from %s.col1" % full_name, qopts)
assert result.data == ['34', '35', '36']
result = self.execute_query("select item from %s t, t.col1" % full_name, qopts)
assert result.data == ['34', '35', '36']
if arr_res == "three_level":
result = self.execute_query("select item from %s.col1" % full_name, qopts)
assert result.data == ['NULL', 'NULL', 'NULL']
result = self.execute_query("select item from %s t, t.col1" % full_name, qopts)
assert result.data == ['NULL', 'NULL', 'NULL']
result = self.execute_query(
"select cnt from %s t, (select count(*) cnt from t.col1) v" % full_name, qopts)
assert result.data == ['3']
# $ parquet-tools schema bad-avro.parquet
# message org.apache.spark.sql.execution.datasources.parquet.test.avro.AvroArrayOfArray {
# required group int_arrays_column (LIST) {
# repeated group array (LIST) {
# repeated int32 array;
# }
# }
# }
#
# $ parquet-tools cat bad-avro.parquet
# int_arrays_column:
# .array:
# ..array = 0
# ..array = 1
# ..array = 2
# .array:
# ..array = 3
# ..array = 4
# ..array = 5
# .array:
# ..array = 6
# ..array = 7
# ..array = 8
#
# int_arrays_column:
# .array:
# ..array = 0
# ..array = 1
# ..array = 2
# .array:
# ..array = 3
# ..array = 4
# ..array = 5
# .array:
# ..array = 6
# ..array = 7
# ..array = 8
#
# [Same int_arrays_column repeated 8x more]
def test_avro_array_of_arrays(self, vector, unique_database):
self.__test_array_of_arrays(unique_database, "AvroArrayOfArrays",
"bad-avro.parquet", vector, 10)
# $ parquet-tools schema bad-thrift.parquet
# message ParquetSchema {
# required group intListsColumn (LIST) {
# repeated group intListsColumn_tuple (LIST) {
# repeated int32 intListsColumn_tuple_tuple;
# }
# }
# }
#
# $ parquet-tools cat bad-thrift.parquet
# intListsColumn:
# .intListsColumn_tuple:
# ..intListsColumn_tuple_tuple = 0
# ..intListsColumn_tuple_tuple = 1
# ..intListsColumn_tuple_tuple = 2
# .intListsColumn_tuple:
# ..intListsColumn_tuple_tuple = 3
# ..intListsColumn_tuple_tuple = 4
# ..intListsColumn_tuple_tuple = 5
# .intListsColumn_tuple:
# ..intListsColumn_tuple_tuple = 6
# ..intListsColumn_tuple_tuple = 7
# ..intListsColumn_tuple_tuple = 8
@SkipIfS3.eventually_consistent
def test_thrift_array_of_arrays(self, vector, unique_database):
self.__test_array_of_arrays(unique_database, "ThriftArrayOfArrays",
"bad-thrift.parquet", vector, 1)
def __test_array_of_arrays(self, db, tbl, parq_file, vector, mult):
arr_res, qopts = self.__init_arr_res(vector)
full_name = "%s.%s" % (db, tbl)
self._create_test_table(db, tbl, parq_file, "col1 array<array<int>>")
if arr_res == "two_level" or arr_res == "two_level_then_three_level":
result = self.execute_query("select item from %s.col1.item" % full_name, qopts)
assert result.data == ['0', '1', '2', '3', '4', '5', '6', '7', '8'] * mult
result = self.execute_query(
"select a2.item from %s t, t.col1 a1, a1.item a2" % full_name)
assert result.data == ['0', '1', '2', '3', '4', '5', '6', '7', '8'] * mult
result = self.execute_query(
"select cnt from %s t, (select count(*) cnt from t.col1) v" % full_name, qopts)
assert result.data == ['3'] * mult
result = self.execute_query(
"select cnt from %s t, t.col1 a1, (select count(*) cnt from a1.item) v"\
% full_name, qopts)
assert result.data == ['3', '3', '3'] * mult
if arr_res == "three_level":
expected_err = "has an incompatible Parquet schema"
try:
self.execute_query("select item from %s.col1.item" % full_name, qopts)
except Exception, e:
assert expected_err in str(e)
try:
self.execute_query("select cnt from %s t, (select count(*) cnt from t.col1) v"\
% full_name, qopts)
except Exception, e:
assert expected_err in str(e)
# $ parquet-tools schema UnannotatedListOfPrimitives.parquet
# message UnannotatedListOfPrimitives {
# repeated int32 list_of_ints;
# }
#
# $ parquet-tools cat UnannotatedListOfPrimitives.parquet
# list_of_ints = 34
# list_of_ints = 35
# list_of_ints = 36
def test_unannotated_list_of_primitives(self, vector, unique_database):
arr_res, qopts = self.__init_arr_res(vector)
tablename = "UnannotatedListOfPrimitives"
full_name = "%s.%s" % (unique_database, tablename)
self._create_test_table(unique_database, tablename,
"UnannotatedListOfPrimitives.parquet", "col1 array<int>")
result = self.execute_query("select item from %s.col1" % full_name, qopts)
assert result.data == ['34', '35', '36']
result = self.execute_query("select item from %s t, t.col1" % full_name, qopts)
assert result.data == ['34', '35', '36']
result = self.execute_query(
"select cnt from %s t, (select count(*) cnt from t.col1) v" % full_name, qopts)
assert result.data == ['3']
# $ parquet-tools schema UnannotatedListOfGroups.parquet
# message UnannotatedListOfGroups {
# repeated group list_of_points {
# required float x;
# required float y;
# }
# }
#
# $ parquet-tools cat UnannotatedListOfGroups.parquet
# list_of_points:
# .x = 1.0
# .y = 1.0
# list_of_points:
# .x = 2.0
# .y = 2.0
def test_unannotated_list_of_groups(self, vector, unique_database):
arr_res, qopts = self.__init_arr_res(vector)
tablename = "UnannotatedListOfGroups"
full_name = "%s.%s" % (unique_database, tablename)
self._create_test_table(unique_database, tablename,
"UnannotatedListOfGroups.parquet", "col1 array<struct<f1: float, f2: float>>")
result = self.execute_query("select f1, f2 from %s.col1" % full_name, qopts)
assert result.data == ['1\t1', '2\t2']
result = self.execute_query("select f1, f2 from %s t, t.col1" % full_name, qopts)
assert result.data == ['1\t1', '2\t2']
result = self.execute_query(
"select cnt from %s t, (select count(*) cnt from t.col1) v" % full_name, qopts)
assert result.data == ['2']
# $ parquet-tools schema AmbiguousList_Modern.parquet
# message org.apache.impala.nested {
# required group ambigArray (LIST) {
# repeated group list {
# required group element {
# required group s2 {
# optional int32 f21;
# optional int32 f22;
# }
# optional int32 F11;
# optional int32 F12;
# }
# }
# }
# }
# $ parquet-tools cat AmbiguousList_Modern.parquet
# ambigArray:
# .list:
# ..element:
# ...s2:
# ....f21 = 21
# ....f22 = 22
# ...F11 = 11
# ...F12 = 12
# .list:
# ..element:
# ...s2:
# ....f21 = 210
# ....f22 = 220
# ...F11 = 110
# ...F12 = 120
#
# $ parquet-tools schema AmbiguousList_Legacy.parquet
# message org.apache.impala.nested {
# required group ambigArray (LIST) {
# repeated group array {
# required group s2 {
# optional int32 f21;
# optional int32 f22;
# }
# optional int32 F11;
# optional int32 F12;
# }
# }
# }
# $ parquet-tools cat AmbiguousList_Legacy.parquet
# ambigArray:
# .array:
# ..s2:
# ...f21 = 21
# ...f22 = 22
# ..F11 = 11
# ..F12 = 12
# .array:
# ..s2:
# ...f21 = 210
# ...f22 = 220
# ..F11 = 110
# ..F12 = 120
def test_ambiguous_list(self, vector, unique_database):
"""IMPALA-4725: Tests the schema-resolution behavior with different values for the
PARQUET_ARRAY_RESOLUTION and PARQUET_FALLBACK_SCHEMA_RESOLUTION query options.
The schema of the Parquet test files is constructed to induce incorrect results
with index-based resolution and the default TWO_LEVEL_THEN_THREE_LEVEL array
resolution policy. Regardless of whether the Parquet data files use the 2-level or
3-level encoding, incorrect results may be returned if the array resolution does
not exactly match the data files'. The name-based policy generally does not have
this problem because it avoids traversing incorrect schema paths.
"""
# The Parquet resolution policy is manually set in the .test files.
if vector.get_value('parquet_array_resolution') != "three_level":
skip("Test only run with three_level")
ambig_modern_tbl = "ambig_modern"
self._create_test_table(unique_database, ambig_modern_tbl,
"AmbiguousList_Modern.parquet",
"ambigarray array<struct<s2:struct<f21:int,f22:int>,f11:int,f12:int>>")
self.run_test_case('QueryTest/parquet-ambiguous-list-modern',
vector, unique_database)
ambig_legacy_tbl = "ambig_legacy"
self._create_test_table(unique_database, ambig_legacy_tbl,
"AmbiguousList_Legacy.parquet",
"ambigarray array<struct<s2:struct<f21:int,f22:int>,f11:int,f12:int>>")
self.run_test_case('QueryTest/parquet-ambiguous-list-legacy',
vector, unique_database)
def _create_test_table(self, dbname, tablename, filename, columns):
"""Creates a table in the given database with the given name and columns. Copies
the file with the given name from TESTFILE_DIR into the table."""
location = get_fs_path("/test-warehouse/%s.db/%s" % (dbname, tablename))
self.client.execute("create table %s.%s (%s) stored as parquet location '%s'" %
(dbname, tablename, columns, location))
local_path = self.TESTFILE_DIR + "/" + filename
self.filesystem_client.copy_from_local(local_path, location)
class TestMaxNestingDepth(ImpalaTestSuite):
# Should be kept in sync with the FE's Type.MAX_NESTING_DEPTH
MAX_NESTING_DEPTH = 100
TABLES = ['struct', 'int_array', 'struct_array', 'int_map', 'struct_map']
TEMP_TABLE_SUFFIX = '_parquet'
@classmethod
def get_workload(self):
return 'functional-query'
@classmethod
def add_test_dimensions(cls):
super(TestMaxNestingDepth, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_constraint(lambda v:
v.get_value('table_format').file_format in ['parquet', 'orc'])
def test_max_nesting_depth(self, vector, unique_database):
"""Tests that Impala can scan Parquet and ORC files having complex types of
the maximum nesting depth."""
file_format = vector.get_value('table_format').file_format
if file_format == 'orc' and not IS_HDFS:
pytest.skip('Orc table loading needs Hive and thus only works with HDFS.')
if file_format == 'parquet':
self.__create_parquet_tables(unique_database)
elif file_format == 'orc':
self.__create_orc_tables(unique_database)
self.run_test_case('QueryTest/max-nesting-depth', vector, unique_database)
def __create_parquet_tables(self, unique_database, as_target=True):
"""Create Parquet tables from files. If 'as_target' is False, the Parquet tables will
be used to create ORC tables, so we add a suffix in the table names."""
self.filesystem_client.copy_from_local(
"%s/testdata/max_nesting_depth" % os.environ['IMPALA_HOME'],
"%s/%s.db/" % (WAREHOUSE, unique_database))
tbl_suffix = '' if as_target else self.TEMP_TABLE_SUFFIX
for tbl in self.TABLES:
tbl_name = "%s.%s_tbl%s" % (unique_database, tbl, tbl_suffix)
tbl_location = "%s/%s.db/max_nesting_depth/%s/" % (WAREHOUSE, unique_database, tbl)
create_table = "CREATE EXTERNAL TABLE %s LIKE PARQUET '%s' STORED AS PARQUET" \
" location '%s'" % (tbl_name, tbl_location + 'file.parq', tbl_location)
self.client.execute(create_table)
def __create_orc_tables(self, unique_database):
# Creating ORC tables from ORC files (IMPALA-8046) has not been supported.
# We create the Parquet tables first and then transform them into ORC tables.
self.__create_parquet_tables(unique_database, False)
for tbl in self.TABLES:
tbl_name = "%s.%s_tbl" % (unique_database, tbl)
from_tbl_name = tbl_name + self.TEMP_TABLE_SUFFIX
create_table = "CREATE TABLE %s LIKE %s STORED AS ORC" % (tbl_name, from_tbl_name)
insert_table = "INSERT INTO %s SELECT * FROM %s" % (tbl_name, from_tbl_name)
self.run_stmt_in_hive(create_table)
self.run_stmt_in_hive(insert_table)
self.client.execute("INVALIDATE METADATA %s" % tbl_name)
@SkipIfIsilon.hive
@SkipIfS3.hive
@SkipIfABFS.hive
@SkipIfADLS.hive
@SkipIfLocal.hive
def test_load_hive_table(self, vector, unique_database):
"""Tests that Impala rejects Hive-created tables with complex types that exceed
the maximum nesting depth."""
# Type with a nesting depth of MAX_NESTING_DEPTH + 1
type_sql = ("array<" * self.MAX_NESTING_DEPTH) + "int" +\
(">" * self.MAX_NESTING_DEPTH)
file_format = vector.get_value('table_format').file_format
create_table_sql = "CREATE TABLE %s.above_max_depth (f %s) STORED AS %s" %\
(unique_database, type_sql, file_format)
self.run_stmt_in_hive(create_table_sql)
self.client.execute("invalidate metadata %s.above_max_depth" % unique_database)
try:
self.client.execute("explain select 1 from %s.above_max_depth" % unique_database)
assert False, "Expected table loading to fail."
except ImpalaBeeswaxException, e:
assert "Type exceeds the maximum nesting depth" in str(e)