blob: c19363f8ede7d78f5ed0333a51d40b98459ff988 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Targeted Impala insert tests
import os
from collections import namedtuple
from datetime import datetime
from decimal import Decimal
from shutil import rmtree
from subprocess import check_call
from tempfile import mkdtemp as make_tmp_dir
from parquet.ttypes import ColumnOrder, SortingColumn, TypeDefinedOrder
from tests.common.environ import impalad_basedir
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.parametrize import UniqueDatabase
from tests.common.skip import SkipIfIsilon, SkipIfLocal, SkipIfS3, SkipIfADLS
from tests.common.test_dimensions import create_exec_option_dimension
from tests.common.test_vector import ImpalaTestDimension
from tests.util.filesystem_utils import get_fs_path
from tests.util.get_parquet_metadata import get_parquet_metadata, decode_stats_value
# TODO: Add Gzip back. IMPALA-424
PARQUET_CODECS = ['none', 'snappy']
class RoundFloat():
"""Class to compare floats after rounding them to a specified number of digits. This
can be used in scenarios where floating point precision is an issue.
"""
def __init__(self, value, num_digits):
self.value = value
self.num_digits = num_digits
def __eq__(self, numeral):
"""Compares this objects's value to a numeral after rounding it."""
return round(self.value, self.num_digits) == round(numeral, self.num_digits)
class TimeStamp():
"""Class to construct timestamps with a default format specifier."""
def __init__(self, value):
# This member must be called 'timetuple'. Only if this class has a member called
# 'timetuple' will the datetime __eq__ function forward an unknown equality check to
# this method by returning NotImplemented:
# https://docs.python.org/2/library/datetime.html#datetime.datetime
self.timetuple = datetime.strptime(value, '%Y-%m-%d %H:%M:%S.%f')
def __eq__(self, other_timetuple):
"""Compares this objects's value to another timetuple."""
return self.timetuple == other_timetuple
ColumnStats = namedtuple('ColumnStats', ['name', 'min', 'max'])
# Test a smaller parquet file size as well
# TODO: these tests take a while so we don't want to go through too many sizes but
# we should in more exhaustive testing
PARQUET_FILE_SIZES = [0, 32 * 1024 * 1024]
class TestInsertParquetQueries(ImpalaTestSuite):
@classmethod
def get_workload(self):
return 'tpch'
@classmethod
def add_test_dimensions(cls):
super(TestInsertParquetQueries, cls).add_test_dimensions()
# Fix the exec_option vector to have a single value. This is needed should we decide
# to run the insert tests in parallel (otherwise there will be two tests inserting
# into the same table at the same time for the same file format).
# TODO: When we do decide to run these tests in parallel we could create unique temp
# tables for each test case to resolve the concurrency problems.
cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(
cluster_sizes=[0], disable_codegen_options=[False], batch_sizes=[0],
sync_ddl=[1]))
cls.ImpalaTestMatrix.add_dimension(
ImpalaTestDimension("compression_codec", *PARQUET_CODECS))
cls.ImpalaTestMatrix.add_dimension(
ImpalaTestDimension("file_size", *PARQUET_FILE_SIZES))
cls.ImpalaTestMatrix.add_constraint(lambda v:
v.get_value('table_format').file_format == 'parquet')
cls.ImpalaTestMatrix.add_constraint(lambda v:
v.get_value('table_format').compression_codec == 'none')
@SkipIfLocal.multiple_impalad
@UniqueDatabase.parametrize(sync_ddl=True)
def test_insert_parquet(self, vector, unique_database):
vector.get_value('exec_option')['PARQUET_FILE_SIZE'] = \
vector.get_value('file_size')
vector.get_value('exec_option')['COMPRESSION_CODEC'] = \
vector.get_value('compression_codec')
self.run_test_case('insert_parquet', vector, unique_database, multiple_impalad=True)
class TestInsertParquetInvalidCodec(ImpalaTestSuite):
@classmethod
def get_workload(self):
return 'functional-query'
@classmethod
def add_test_dimensions(cls):
super(TestInsertParquetInvalidCodec, cls).add_test_dimensions()
# Fix the exec_option vector to have a single value.
cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(
cluster_sizes=[0], disable_codegen_options=[False], batch_sizes=[0],
sync_ddl=[1]))
cls.ImpalaTestMatrix.add_dimension(
ImpalaTestDimension("compression_codec", 'bzip2'))
cls.ImpalaTestMatrix.add_constraint(lambda v:
v.get_value('table_format').file_format == 'parquet')
cls.ImpalaTestMatrix.add_constraint(lambda v:
v.get_value('table_format').compression_codec == 'none')
@SkipIfLocal.multiple_impalad
def test_insert_parquet_invalid_codec(self, vector):
vector.get_value('exec_option')['COMPRESSION_CODEC'] = \
vector.get_value('compression_codec')
self.run_test_case('QueryTest/insert_parquet_invalid_codec', vector,
multiple_impalad=True)
class TestInsertParquetVerifySize(ImpalaTestSuite):
@classmethod
def get_workload(self):
return 'tpch'
@classmethod
def add_test_dimensions(cls):
super(TestInsertParquetVerifySize, cls).add_test_dimensions()
# Fix the exec_option vector to have a single value.
cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(
cluster_sizes=[0], disable_codegen_options=[False], batch_sizes=[0],
sync_ddl=[1]))
cls.ImpalaTestMatrix.add_constraint(lambda v:
v.get_value('table_format').file_format == 'parquet')
cls.ImpalaTestMatrix.add_constraint(lambda v:
v.get_value('table_format').compression_codec == 'none')
cls.ImpalaTestMatrix.add_dimension(
ImpalaTestDimension("compression_codec", *PARQUET_CODECS))
@SkipIfIsilon.hdfs_block_size
@SkipIfLocal.hdfs_client
def test_insert_parquet_verify_size(self, vector, unique_database):
# Test to verify that the result file size is close to what we expect.
tbl_name = "parquet_insert_size"
fq_tbl_name = unique_database + "." + tbl_name
location = get_fs_path("test-warehouse/{0}.db/{1}/"
.format(unique_database, tbl_name))
create = ("create table {0} like tpch_parquet.orders stored as parquet"
.format(fq_tbl_name, location))
query = "insert overwrite {0} select * from tpch.orders".format(fq_tbl_name)
block_size = 40 * 1024 * 1024
self.execute_query(create)
vector.get_value('exec_option')['PARQUET_FILE_SIZE'] = block_size
vector.get_value('exec_option')['COMPRESSION_CODEC'] =\
vector.get_value('compression_codec')
vector.get_value('exec_option')['num_nodes'] = 1
self.execute_query(query, vector.get_value('exec_option'))
# Get the files in hdfs and verify. There can be at most 1 file that is smaller
# that the block_size. The rest should be within 80% of it and not over.
found_small_file = False
sizes = self.filesystem_client.get_all_file_sizes(location)
for size in sizes:
assert size < block_size, "File size greater than expected.\
Expected: {0}, Got: {1}".format(block_size, size)
if size < block_size * 0.80:
assert not found_small_file
found_small_file = True
class TestHdfsParquetTableWriter(ImpalaTestSuite):
@classmethod
def get_workload(cls):
return 'functional-query'
@classmethod
def add_test_dimensions(cls):
super(TestHdfsParquetTableWriter, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_constraint(
lambda v: v.get_value('table_format').file_format == 'parquet')
def test_def_level_encoding(self, vector, unique_database):
"""IMPALA-3376: Tests that parquet files are written to HDFS correctly by generating a
parquet table and running the parquet-reader tool on it, which performs sanity
checking, such as that the correct number of definition levels were encoded.
"""
table_name = "test_hdfs_parquet_table_writer"
qualified_table_name = "%s.%s" % (unique_database, table_name)
self.execute_query("create table %s stored as parquet as select l_linenumber from "
"tpch_parquet.lineitem limit 180000" % qualified_table_name)
tmp_dir = make_tmp_dir()
try:
hdfs_file = get_fs_path('/test-warehouse/%s.db/%s/*.parq'
% (unique_database, table_name))
check_call(['hdfs', 'dfs', '-copyToLocal', hdfs_file, tmp_dir])
for root, subdirs, files in os.walk(tmp_dir):
for f in files:
if not f.endswith('parq'):
continue
check_call([os.path.join(impalad_basedir, 'util/parquet-reader'), '--file',
os.path.join(tmp_dir, str(f))])
finally:
self.execute_query("drop table %s" % qualified_table_name)
rmtree(tmp_dir)
def test_sorting_columns(self, vector, unique_database, tmpdir):
"""Tests that RowGroup::sorting_columns gets populated when the table has SORT BY
columns."""
source_table = "functional_parquet.alltypessmall"
target_table = "test_write_sorting_columns"
qualified_target_table = "{0}.{1}".format(unique_database, target_table)
hdfs_path = get_fs_path("/test-warehouse/{0}.db/{1}/".format(unique_database,
target_table))
# Create table
query = "create table {0} sort by (int_col, id) like {1} stored as parquet".format(
qualified_target_table, source_table)
self.execute_query(query)
# Insert data
query = ("insert into {0} partition(year, month) select * from {1}").format(
qualified_target_table, source_table)
self.execute_query(query)
# Download hdfs files and extract rowgroup metadata
row_groups = []
check_call(['hdfs', 'dfs', '-get', hdfs_path, tmpdir.strpath])
for root, subdirs, files in os.walk(tmpdir.strpath):
for f in files:
parquet_file = os.path.join(root, str(f))
file_meta_data = get_parquet_metadata(parquet_file)
row_groups.extend(file_meta_data.row_groups)
# Verify that the files have the sorted_columns set
expected = [SortingColumn(4, False, False), SortingColumn(0, False, False)]
for row_group in row_groups:
assert row_group.sorting_columns == expected
def test_set_column_orders(self, vector, unique_database, tmpdir):
"""Tests that the Parquet writers set FileMetaData::column_orders."""
source_table = "functional_parquet.alltypessmall"
target_table = "test_set_column_orders"
qualified_target_table = "{0}.{1}".format(unique_database, target_table)
hdfs_path = get_fs_path("/test-warehouse/{0}.db/{1}/".format(unique_database,
target_table))
# Create table
query = "create table {0} like {1} stored as parquet".format(qualified_target_table,
source_table)
self.execute_query(query)
# Insert data
query = ("insert into {0} partition(year, month) select * from {1}").format(
qualified_target_table, source_table)
self.execute_query(query)
# Download hdfs files and verify column orders
check_call(['hdfs', 'dfs', '-get', hdfs_path, tmpdir.strpath])
expected_col_orders = [ColumnOrder(TYPE_ORDER=TypeDefinedOrder())] * 11
for root, subdirs, files in os.walk(tmpdir.strpath):
for f in files:
parquet_file = os.path.join(root, str(f))
file_meta_data = get_parquet_metadata(parquet_file)
assert file_meta_data.column_orders == expected_col_orders
@SkipIfIsilon.hive
@SkipIfLocal.hive
@SkipIfS3.hive
@SkipIfADLS.hive
# TODO: Should we move this to test_parquet_stats.py?
class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
@classmethod
def get_workload(cls):
return 'functional-query'
@classmethod
def add_test_dimensions(cls):
super(TestHdfsParquetTableStatsWriter, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_constraint(
lambda v: v.get_value('table_format').file_format == 'parquet')
def _decode_row_group_stats(self, schemas, row_group_stats):
"""Decodes and return a list of statistics for a single row group."""
decoded = []
assert len(schemas) == len(row_group_stats)
for schema, stats in zip(schemas, row_group_stats):
if stats is None:
decoded.append(None)
continue
if stats.min_value is None and stats.max_value is None:
decoded.append(None)
continue
assert stats.min_value is not None and stats.max_value is not None
min_value = decode_stats_value(schema, stats.min_value)
max_value = decode_stats_value(schema, stats.max_value)
decoded.append(ColumnStats(schema.name, min_value, max_value))
assert len(decoded) == len(schemas)
return decoded
def _get_row_group_stats_from_file(self, parquet_file):
"""Returns a list of statistics for each row group in file 'parquet_file'. The result
is a two-dimensional list, containing stats by row group and column."""
file_meta_data = get_parquet_metadata(parquet_file)
# We only support flat schemas, the additional element is the root element.
schemas = file_meta_data.schema[1:]
file_stats = []
for row_group in file_meta_data.row_groups:
num_columns = len(row_group.columns)
assert num_columns == len(schemas)
column_stats = [c.meta_data.statistics for c in row_group.columns]
file_stats.append(self._decode_row_group_stats(schemas, column_stats))
return file_stats
def _get_row_group_stats_from_hdfs_folder(self, hdfs_path):
"""Returns a list of statistics for each row group in all parquet files in
'hdfs_path'. The result is a two-dimensional list, containing stats by row group and
column."""
row_group_stats = []
try:
tmp_dir = make_tmp_dir()
check_call(['hdfs', 'dfs', '-get', hdfs_path, tmp_dir])
for root, subdirs, files in os.walk(tmp_dir):
for f in files:
parquet_file = os.path.join(root, str(f))
row_group_stats.extend(self._get_row_group_stats_from_file(parquet_file))
finally:
rmtree(tmp_dir)
return row_group_stats
def _validate_min_max_stats(self, hdfs_path, expected_values, skip_col_idxs = None):
"""Validates that 'hdfs_path' contains exactly one parquet file and that the rowgroup
statistics in that file match the values in 'expected_values'. Columns indexed by
'skip_col_idx' are excluded from the verification of the expected values.
"""
skip_col_idxs = skip_col_idxs or []
# The caller has to make sure that the table fits into a single row group. We enforce
# it here to make sure the results are predictable and independent of how the data
# could get written across multiple files.
row_group_stats = self._get_row_group_stats_from_hdfs_folder(hdfs_path)
assert(len(row_group_stats)) == 1
table_stats = row_group_stats[0]
num_columns = len(table_stats)
assert num_columns == len(expected_values)
for col_idx, stats, expected in zip(range(num_columns), table_stats, expected_values):
if col_idx in skip_col_idxs:
continue
if not expected:
assert not stats
continue
assert stats == expected
def _ctas_table_and_verify_stats(self, vector, unique_database, source_table,
expected_values):
"""Copies 'source_table' into a parquet table and makes sure that the row group
statistics in the resulting parquet file match those in 'expected_values'.
"""
table_name = "test_hdfs_parquet_table_writer"
qualified_table_name = "{0}.{1}".format(unique_database, table_name)
hdfs_path = get_fs_path('/test-warehouse/{0}.db/{1}/'.format(unique_database,
table_name))
# Setting num_nodes = 1 ensures that the query is executed on the coordinator,
# resulting in a single parquet file being written.
self.execute_query("drop table if exists {0}".format(qualified_table_name))
query = ("create table {0} stored as parquet as select * from {1}").format(
qualified_table_name, source_table)
vector.get_value('exec_option')['num_nodes'] = 1
self.execute_query(query, vector.get_value('exec_option'))
self._validate_min_max_stats(hdfs_path, expected_values)
def test_write_statistics_alltypes(self, vector, unique_database):
"""Test that writing a parquet file populates the rowgroup statistics with the correct
values.
"""
# Expected values for functional.alltypes
expected_min_max_values = [
ColumnStats('id', 0, 7299),
ColumnStats('bool_col', False, True),
ColumnStats('tinyint_col', 0, 9),
ColumnStats('smallint_col', 0, 9),
ColumnStats('int_col', 0, 9),
ColumnStats('bigint_col', 0, 90),
ColumnStats('float_col', 0, RoundFloat(9.9, 1)),
ColumnStats('double_col', 0, RoundFloat(90.9, 1)),
ColumnStats('date_string_col', '01/01/09', '12/31/10'),
ColumnStats('string_col', '0', '9'),
ColumnStats('timestamp_col', TimeStamp('2009-01-01 00:00:00.0'),
TimeStamp('2010-12-31 05:09:13.860000')),
ColumnStats('year', 2009, 2010),
ColumnStats('month', 1, 12),
]
self._ctas_table_and_verify_stats(vector, unique_database, "functional.alltypes",
expected_min_max_values)
def test_write_statistics_decimal(self, vector, unique_database):
"""Test that writing a parquet file populates the rowgroup statistics with the correct
values for decimal columns.
"""
# Expected values for functional.decimal_tbl
expected_min_max_values = [
ColumnStats('d1', 1234, 132842),
ColumnStats('d2', 111, 2222),
ColumnStats('d3', Decimal('1.23456789'), Decimal('12345.6789')),
ColumnStats('d4', Decimal('0.123456789'), Decimal('0.123456789')),
ColumnStats('d5', Decimal('0.1'), Decimal('12345.789')),
ColumnStats('d6', 1, 1)
]
self._ctas_table_and_verify_stats(vector, unique_database, "functional.decimal_tbl",
expected_min_max_values)
def test_write_statistics_multi_page(self, vector, unique_database):
"""Test that writing a parquet file populates the rowgroup statistics with the correct
values. This test write a single parquet file with several pages per column.
"""
# Expected values for tpch_parquet.customer
expected_min_max_values = [
ColumnStats('c_custkey', 1, 150000),
ColumnStats('c_name', 'Customer#000000001', 'Customer#000150000'),
ColumnStats('c_address', ' 2uZwVhQvwA', 'zzxGktzXTMKS1BxZlgQ9nqQ'),
ColumnStats('c_nationkey', 0, 24),
ColumnStats('c_phone', '10-100-106-1617', '34-999-618-6881'),
ColumnStats('c_acctbal', Decimal('-999.99'), Decimal('9999.99')),
ColumnStats('c_mktsegment', 'AUTOMOBILE', 'MACHINERY'),
ColumnStats('c_comment', ' Tiresias according to the slyly blithe instructions '
'detect quickly at the slyly express courts. express dinos wake ',
'zzle. blithely regular instructions cajol'),
]
self._ctas_table_and_verify_stats(vector, unique_database, "tpch_parquet.customer",
expected_min_max_values)
def test_write_statistics_null(self, vector, unique_database):
"""Test that we don't write min/max statistics for null columns."""
expected_min_max_values = [
ColumnStats('a', 'a', 'a'),
ColumnStats('b', '', ''),
None,
None,
None,
ColumnStats('f', 'a\x00b', 'a\x00b'),
ColumnStats('g', '\x00', '\x00')
]
self._ctas_table_and_verify_stats(vector, unique_database, "functional.nulltable",
expected_min_max_values)
def test_write_statistics_char_types(self, vector, unique_database):
"""Test that Impala correctly writes statistics for char columns."""
table_name = "test_char_types"
qualified_table_name = "{0}.{1}".format(unique_database, table_name)
create_table_stmt = "create table {0} (c3 char(3), vc varchar, st string);".format(
qualified_table_name)
self.execute_query(create_table_stmt)
insert_stmt = """insert into {0} values
(cast("def" as char(3)), "ghj xyz", "abc xyz"),
(cast("abc" as char(3)), "def 123 xyz", "lorem ipsum"),
(cast("xy" as char(3)), "abc banana", "dolor dis amet")""".format(qualified_table_name)
self.execute_query(insert_stmt)
expected_min_max_values = [
ColumnStats('c3', 'abc', 'xy'),
ColumnStats('vc', 'abc banana', 'ghj xyz'),
ColumnStats('st', 'abc xyz', 'lorem ipsum')
]
self._ctas_table_and_verify_stats(vector, unique_database, qualified_table_name,
expected_min_max_values)
def test_write_statistics_negative(self, vector, unique_database):
"""Test that Impala correctly writes statistics for negative values."""
view_name = "test_negative_view"
qualified_view_name = "{0}.{1}".format(unique_database, view_name)
# Create a view to generate test data with negative values by negating every other
# row.
create_view_stmt = """create view {0} as select
id * cast(pow(-1, id % 2) as int) as id,
int_col * cast(pow(-1, id % 2) as int) as int_col,
bigint_col * cast(pow(-1, id % 2) as bigint) as bigint_col,
float_col * pow(-1, id % 2) as float_col,
double_col * pow(-1, id % 2) as double_col
from functional.alltypes""".format(qualified_view_name)
self.execute_query(create_view_stmt)
expected_min_max_values = [
ColumnStats('id', -7299, 7298),
ColumnStats('int_col', -9, 8),
ColumnStats('bigint_col', -90, 80),
ColumnStats('float_col', RoundFloat(-9.9, 1), RoundFloat(8.8, 1)),
ColumnStats('double_col', RoundFloat(-90.9, 1), RoundFloat(80.8, 1)),
]
self._ctas_table_and_verify_stats(vector, unique_database, qualified_view_name,
expected_min_max_values)
def test_write_statistics_multiple_row_groups(self, vector, unique_database):
"""Test that writing multiple row groups works as expected. This is done by inserting
into a table using the SORT BY clause and then making sure that the min and max values
of row groups don't overlap."""
source_table = "tpch_parquet.orders"
target_table = "test_hdfs_parquet_table_writer"
qualified_target_table = "{0}.{1}".format(unique_database, target_table)
hdfs_path = get_fs_path("/test-warehouse/{0}.db/{1}/".format(
unique_database, target_table))
# Insert a large amount of data on a single backend with a limited parquet file size.
# This will result in several files being written, exercising code that tracks
# statistics for row groups.
query = "create table {0} sort by (o_orderkey) like {1} stored as parquet".format(
qualified_target_table, source_table)
self.execute_query(query, vector.get_value('exec_option'))
query = ("insert into {0} select * from {1}").format(
qualified_target_table, source_table)
vector.get_value('exec_option')['num_nodes'] = 1
vector.get_value('exec_option')['parquet_file_size'] = 8 * 1024 * 1024
self.execute_query(query, vector.get_value('exec_option'))
# Get all stats for the o_orderkey column
row_group_stats = self._get_row_group_stats_from_hdfs_folder(hdfs_path)
assert len(row_group_stats) > 1
orderkey_stats = [s[0] for s in row_group_stats]
# Make sure that they don't overlap by ordering by the min value, then looking at
# boundaries.
orderkey_stats.sort(key = lambda s: s.min)
for l, r in zip(orderkey_stats, orderkey_stats[1:]):
assert l.max <= r.min
def test_write_statistics_float_infinity(self, vector, unique_database):
"""Test that statistics for -inf and inf are written correctly."""
table_name = "test_float_infinity"
qualified_table_name = "{0}.{1}".format(unique_database, table_name)
create_table_stmt = "create table {0} (f float, d double);".format(
qualified_table_name)
self.execute_query(create_table_stmt)
insert_stmt = """insert into {0} values
(cast('-inf' as float), cast('-inf' as double)),
(cast('inf' as float), cast('inf' as double))""".format(qualified_table_name)
self.execute_query(insert_stmt)
expected_min_max_values = [
ColumnStats('f', float('-inf'), float('inf')),
ColumnStats('d', float('-inf'), float('inf')),
]
self._ctas_table_and_verify_stats(vector, unique_database, qualified_table_name,
expected_min_max_values)