blob: cd81277671d1dd4f46cb64ea8331ee01d3a670e1 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Targeted Impala insert tests
import os
from collections import namedtuple
from subprocess import check_call
from parquet.ttypes import BoundaryOrder, ColumnIndex, OffsetIndex, PageHeader, PageType
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.skip import SkipIfLocal
from tests.util.filesystem_utils import get_fs_path
from tests.util.get_parquet_metadata import (
decode_stats_value,
get_parquet_metadata,
read_serialized_object
)
PAGE_INDEX_MAX_STRING_LENGTH = 64
@SkipIfLocal.parquet_file_size
class TestHdfsParquetTableIndexWriter(ImpalaTestSuite):
"""Since PARQUET-922 page statistics can be written before the footer.
The tests in this class checks if Impala writes the page indices correctly.
It is temporarily a custom cluster test suite because we need to set the
enable_parquet_page_index_writing command-line flag for the Impala daemon
in order to make it write the page index.
TODO: IMPALA-5843 Once Impala is able to read the page index and also write it by
default, this test suite should be moved back to query tests.
"""
@classmethod
def get_workload(cls):
return 'functional-query'
@classmethod
def add_test_dimensions(cls):
super(TestHdfsParquetTableIndexWriter, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_constraint(
lambda v: v.get_value('table_format').file_format == 'parquet')
def _get_row_group_from_file(self, parquet_file):
"""Returns namedtuples that contain the schema, stats, offset_index, column_index,
and page_headers for each column in the first row group in file 'parquet_file'. Fails
if the file contains multiple row groups.
"""
ColumnInfo = namedtuple('ColumnInfo', ['schema', 'stats', 'offset_index',
'column_index', 'page_headers'])
file_meta_data = get_parquet_metadata(parquet_file)
assert len(file_meta_data.row_groups) == 1
# We only support flat schemas, the additional element is the root element.
schemas = file_meta_data.schema[1:]
row_group = file_meta_data.row_groups[0]
assert len(schemas) == len(row_group.columns)
row_group_index = []
with open(parquet_file) as file_handle:
for column, schema in zip(row_group.columns, schemas):
column_index_offset = column.column_index_offset
column_index_length = column.column_index_length
column_index = None
if column_index_offset and column_index_length:
column_index = read_serialized_object(ColumnIndex, file_handle,
column_index_offset, column_index_length)
column_meta_data = column.meta_data
stats = None
if column_meta_data:
stats = column_meta_data.statistics
offset_index_offset = column.offset_index_offset
offset_index_length = column.offset_index_length
offset_index = None
page_headers = []
if offset_index_offset and offset_index_length:
offset_index = read_serialized_object(OffsetIndex, file_handle,
offset_index_offset, offset_index_length)
for page_loc in offset_index.page_locations:
page_header = read_serialized_object(PageHeader, file_handle, page_loc.offset,
page_loc.compressed_page_size)
page_headers.append(page_header)
column_info = ColumnInfo(schema, stats, offset_index, column_index, page_headers)
row_group_index.append(column_info)
return row_group_index
def _get_row_groups_from_hdfs_folder(self, hdfs_path, tmpdir):
"""Returns a list of column infos (containing the schema, stats, offset_index,
column_index, and page_headers) for the first row group in all parquet files in
'hdfs_path'.
"""
row_group_indexes = []
check_call(['hdfs', 'dfs', '-get', hdfs_path, tmpdir.strpath])
for root, subdirs, files in os.walk(tmpdir.strpath):
for f in files:
parquet_file = os.path.join(root, str(f))
row_group_indexes.append(self._get_row_group_from_file(parquet_file))
return row_group_indexes
def _validate_page_locations(self, page_locations):
"""Validate that the page locations are in order."""
for previous_loc, current_loc in zip(page_locations[:-1], page_locations[1:]):
assert previous_loc.offset < current_loc.offset
assert previous_loc.first_row_index < current_loc.first_row_index
def _validate_null_stats(self, index_size, column_info):
"""Validates the statistics stored in null_pages and null_counts."""
column_index = column_info.column_index
column_stats = column_info.stats
assert column_index.null_pages is not None
assert len(column_index.null_pages) == index_size
assert column_index.null_counts is not None
assert len(column_index.null_counts) == index_size
for page_is_null, null_count, page_header in zip(column_index.null_pages,
column_index.null_counts, column_info.page_headers):
assert page_header.type == PageType.DATA_PAGE
num_values = page_header.data_page_header.num_values
assert not page_is_null or null_count == num_values
if column_stats:
assert column_stats.null_count == sum(column_index.null_counts)
def _validate_min_max_values(self, index_size, column_info):
"""Validate min/max values of the pages in a column chunk."""
column_index = column_info.column_index
min_values = column_info.column_index.min_values
assert len(min_values) == index_size
max_values = column_info.column_index.max_values
assert len(max_values) == index_size
if not column_info.stats:
return
column_min_value_str = column_info.stats.min_value
column_max_value_str = column_info.stats.max_value
if column_min_value_str is None or column_max_value_str is None:
# If either is None, then both need to be None.
assert column_min_value_str is None and column_max_value_str is None
# No min and max value, all pages need to be null
for idx, null_page in enumerate(column_index.null_pages):
assert null_page, "Page {} of column {} is not null, \
but doesn't have min and max values!".format(idx, column_index.schema.name)
# Everything is None, no further checks needed.
return
column_min_value = decode_stats_value(column_info.schema, column_min_value_str)
for null_page, page_min_str in zip(column_index.null_pages, min_values):
if not null_page:
page_min_value = decode_stats_value(column_info.schema, page_min_str)
# If type is str, page_min_value might have been truncated.
if isinstance(page_min_value, basestring):
assert page_min_value >= column_min_value[:len(page_min_value)]
else:
assert page_min_value >= column_min_value
column_max_value = decode_stats_value(column_info.schema, column_max_value_str)
for null_page, page_max_str in zip(column_index.null_pages, max_values):
if not null_page:
page_max_value = decode_stats_value(column_info.schema, page_max_str)
# If type is str, page_max_value might have been truncated and incremented.
if (isinstance(page_max_value, basestring) and
len(page_max_value) == PAGE_INDEX_MAX_STRING_LENGTH):
max_val_prefix = page_max_value.rstrip('\0')
assert max_val_prefix[:-1] <= column_max_value
else:
assert page_max_value <= column_max_value
def _validate_ordering(self, ordering, schema, null_pages, min_values, max_values):
"""Check if the ordering of the values reflects the value of 'ordering'."""
def is_sorted(l, reverse=False):
if not reverse:
return all(a <= b for a, b in zip(l, l[1:]))
else:
return all(a >= b for a, b in zip(l, l[1:]))
# Filter out null pages and decode the actual min/max values.
actual_min_values = [decode_stats_value(schema, min_val)
for min_val, is_null in zip(min_values, null_pages)
if not is_null]
actual_max_values = [decode_stats_value(schema, max_val)
for max_val, is_null in zip(max_values, null_pages)
if not is_null]
# For ASCENDING and DESCENDING, both min and max values need to be sorted.
if ordering == BoundaryOrder.ASCENDING:
assert is_sorted(actual_min_values)
assert is_sorted(actual_max_values)
elif ordering == BoundaryOrder.DESCENDING:
assert is_sorted(actual_min_values, reverse=True)
assert is_sorted(actual_max_values, reverse=True)
else:
assert ordering == BoundaryOrder.UNORDERED
# For UNORDERED, min and max values cannot be both sorted.
assert not is_sorted(actual_min_values) or not is_sorted(actual_max_values)
assert (not is_sorted(actual_min_values, reverse=True) or
not is_sorted(actual_max_values, reverse=True))
def _validate_boundary_order(self, column_info):
"""Validate that min/max values are really in the order specified by
boundary order.
"""
column_index = column_info.column_index
self._validate_ordering(column_index.boundary_order, column_info.schema,
column_index.null_pages, column_index.min_values, column_index.max_values)
def _validate_parquet_page_index(self, hdfs_path, tmpdir):
"""Validates that 'hdfs_path' contains exactly one parquet file and that the rowgroup
index in that file is in the valid format.
"""
row_group_indexes = self._get_row_groups_from_hdfs_folder(hdfs_path, tmpdir)
for columns in row_group_indexes:
for column_info in columns:
try:
index_size = len(column_info.offset_index.page_locations)
assert index_size > 0
self._validate_page_locations(column_info.offset_index.page_locations)
self._validate_null_stats(index_size, column_info)
self._validate_min_max_values(index_size, column_info)
self._validate_boundary_order(column_info)
except AssertionError as e:
e.args += ("Validation failed on column {}.".format(column_info.schema.name),)
raise
def _ctas_table(self, vector, unique_database, source_db, source_table,
tmpdir, sorting_column=None):
"""Copies the contents of 'source_db.source_table' into a parquet table.
"""
qualified_source_table = "{0}.{1}".format(source_db, source_table)
table_name = source_table
qualified_table_name = "{0}.{1}".format(unique_database, table_name)
# Setting num_nodes = 1 ensures that the query is executed on the coordinator,
# resulting in a single parquet file being written.
vector.get_value('exec_option')['num_nodes'] = 1
self.execute_query("drop table if exists {0}".format(qualified_table_name))
if sorting_column is None:
query = ("create table {0} stored as parquet as select * from {1}").format(
qualified_table_name, qualified_source_table)
else:
query = ("create table {0} sort by({1}) stored as parquet as select * from {2}"
).format(qualified_table_name, sorting_column, qualified_source_table)
self.execute_query(query, vector.get_value('exec_option'))
def _ctas_table_and_verify_index(self, vector, unique_database, source_db, source_table,
tmpdir, sorting_column=None):
"""Copies 'source_table' into a parquet table and makes sure that the index
in the resulting parquet file is valid.
"""
self._ctas_table(vector, unique_database, source_db, source_table, tmpdir,
sorting_column)
hdfs_path = get_fs_path('/test-warehouse/{0}.db/{1}/'.format(unique_database,
source_table))
qualified_source_table = "{0}.{1}".format(unique_database, source_table)
self._validate_parquet_page_index(hdfs_path, tmpdir.join(qualified_source_table))
def _create_table_with_values_of_type(self, col_type, vector, unique_database,
table_name, values_sql):
"""Creates a parquet table that has a single string column, then invokes an insert
statement on it with the 'values_sql' parameter. E.g. 'values_sql' is "('asdf')".
It returns the HDFS path for the table.
"""
qualified_table_name = "{0}.{1}".format(unique_database, table_name)
self.execute_query("drop table if exists {0}".format(qualified_table_name))
vector.get_value('exec_option')['num_nodes'] = 1
query = ("create table {0} (x {1}) stored as parquet").format(
qualified_table_name, col_type)
self.execute_query(query, vector.get_value('exec_option'))
self.execute_query("insert into {0} values {1}".format(qualified_table_name,
values_sql), vector.get_value('exec_option'))
return get_fs_path('/test-warehouse/{0}.db/{1}/'.format(unique_database,
table_name))
def test_ctas_tables(self, vector, unique_database, tmpdir):
"""Test different Parquet files created via CTAS statements."""
# Test that writing a parquet file populates the rowgroup indexes with the correct
# values.
self._ctas_table_and_verify_index(vector, unique_database, "functional", "alltypes",
tmpdir)
# Test that writing a parquet file populates the rowgroup indexes with the correct
# values, using decimal types.
self._ctas_table_and_verify_index(vector, unique_database, "functional",
"decimal_tbl", tmpdir)
# Test that writing a parquet file populates the rowgroup indexes with the correct
# values, using date types.
self._ctas_table_and_verify_index(vector, unique_database, "functional", "date_tbl",
tmpdir)
# Test that writing a parquet file populates the rowgroup indexes with the correct
# values, using char types.
self._ctas_table_and_verify_index(vector, unique_database, "functional",
"chars_formats", tmpdir)
# Test that we don't write min/max values in the index for null columns.
# Ensure null_count is set for columns with null values.
self._ctas_table_and_verify_index(vector, unique_database, "functional", "nulltable",
tmpdir)
# Test that when a ColumnChunk is written across multiple pages, the index is
# valid.
self._ctas_table_and_verify_index(vector, unique_database, "tpch", "customer",
tmpdir)
self._ctas_table_and_verify_index(vector, unique_database, "tpch", "orders",
tmpdir)
# Test that when the schema has a sorting column, the index is valid.
self._ctas_table_and_verify_index(vector, unique_database,
"functional_parquet", "zipcode_incomes", tmpdir, "id")
# Test table with wide row.
self._ctas_table_and_verify_index(vector, unique_database,
"functional_parquet", "widerow", tmpdir)
# Test tables with wide rows and many columns.
self._ctas_table_and_verify_index(vector, unique_database,
"functional_parquet", "widetable_250_cols", tmpdir)
self._ctas_table_and_verify_index(vector, unique_database,
"functional_parquet", "widetable_500_cols", tmpdir)
self._ctas_table_and_verify_index(vector, unique_database,
"functional_parquet", "widetable_1000_cols", tmpdir)
def test_max_string_values(self, vector, unique_database, tmpdir):
"""Test string values that are all 0xFFs or end with 0xFFs."""
col_type = "STRING"
# String value is all of 0xFFs but its length is less than PAGE_INDEX_TRUNCATE_LENGTH.
short_tbl = "short_tbl"
short_hdfs_path = self._create_table_with_values_of_type(col_type, vector,
unique_database, short_tbl,
"(rpad('', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH - 1))
self._validate_parquet_page_index(short_hdfs_path, tmpdir.join(short_tbl))
# String value is all of 0xFFs and its length is PAGE_INDEX_TRUNCATE_LENGTH.
fit_tbl = "fit_tbl"
fit_hdfs_path = self._create_table_with_values_of_type(col_type, vector,
unique_database, fit_tbl,
"(rpad('', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH))
self._validate_parquet_page_index(fit_hdfs_path, tmpdir.join(fit_tbl))
# All bytes are 0xFFs and the string is longer then PAGE_INDEX_TRUNCATE_LENGTH, so we
# should not write page statistics.
too_long_tbl = "too_long_tbl"
too_long_hdfs_path = self._create_table_with_values_of_type(col_type, vector,
unique_database, too_long_tbl, "(rpad('', {0}, chr(255)))".format(
PAGE_INDEX_MAX_STRING_LENGTH + 1))
row_group_indexes = self._get_row_groups_from_hdfs_folder(too_long_hdfs_path,
tmpdir.join(too_long_tbl))
column = row_group_indexes[0][0]
assert column.column_index is None
# We always write the offset index
assert column.offset_index is not None
# Test string with value that starts with 'aaa' following with 0xFFs and its length is
# greater than PAGE_INDEX_TRUNCATE_LENGTH. Max value should be 'aab'.
aaa_tbl = "aaa_tbl"
aaa_hdfs_path = self._create_table_with_values_of_type(col_type, vector,
unique_database, aaa_tbl,
"(rpad('aaa', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH + 1))
row_group_indexes = self._get_row_groups_from_hdfs_folder(aaa_hdfs_path,
tmpdir.join(aaa_tbl))
column = row_group_indexes[0][0]
assert len(column.column_index.max_values) == 1
max_value = column.column_index.max_values[0]
assert max_value == 'aab'
def test_row_count_limit(self, vector, unique_database, tmpdir):
"""Tests that we can set the page row count limit via a query option.
"""
vector.get_value('exec_option')['parquet_page_row_count_limit'] = 20
table_name = "alltypessmall"
self._ctas_table_and_verify_index(vector, unique_database, "functional", table_name,
tmpdir)
hdfs_path = get_fs_path('/test-warehouse/{0}.db/{1}/'.format(unique_database,
table_name))
row_group_indexes = self._get_row_groups_from_hdfs_folder(hdfs_path,
tmpdir.join(table_name))
for row_group in row_group_indexes:
for column in row_group:
for page_header in column.page_headers:
if page_header.data_page_header is not None:
assert page_header.data_page_header.num_values == 20
result_row20 = self.execute_query(
"select * from {0}.{1} order by id".format(unique_database, table_name))
result_orig = self.execute_query(
"select * from functional.alltypessmall order by id")
assert result_row20.data == result_orig.data
def test_row_count_limit_nulls(self, vector, unique_database, tmpdir):
"""Tests that we can set the page row count limit on a table with null values.
"""
vector.get_value('exec_option')['parquet_page_row_count_limit'] = 2
null_tbl = 'null_table'
null_tbl_path = self._create_table_with_values_of_type("STRING", vector,
unique_database, null_tbl, "(NULL), (NULL), ('foo'), (NULL), (NULL), (NULL)")
row_group_indexes = self._get_row_groups_from_hdfs_folder(null_tbl_path,
tmpdir.join(null_tbl))
column = row_group_indexes[0][0]
assert column.column_index.null_pages[0] == True
assert column.column_index.null_pages[1] == False
assert column.column_index.null_pages[2] == True
assert len(column.page_headers) == 3
for page_header in column.page_headers:
assert page_header.data_page_header.num_values == 2
def test_disable_page_index_writing(self, vector, unique_database, tmpdir):
"""Tests that we can disable page index writing via a query option.
"""
vector.get_value('exec_option')['parquet_write_page_index'] = False
table_name = "alltypessmall"
self._ctas_table(vector, unique_database, "functional", table_name, tmpdir)
hdfs_path = get_fs_path('/test-warehouse/{0}.db/{1}/'.format(unique_database,
table_name))
row_group_indexes = self._get_row_groups_from_hdfs_folder(hdfs_path,
tmpdir.join(table_name))
for row_group in row_group_indexes:
for column in row_group:
assert column.offset_index is None
assert column.column_index is None
def test_nan_values_for_floating_types(self, vector, unique_database, tmpdir):
""" IMPALA-7304: Impala doesn't write column index for floating-point columns
until PARQUET-1222 is resolved. This is modified by:
IMPALA-8498: Write column index for floating types when NaN is not present.
"""
for col_type in ["float", "double"]:
nan_val = "(CAST('NaN' as " + col_type + "))"
# Table contains no NaN values.
no_nan_tbl = "no_nan_tbl_" + col_type
no_nan_hdfs_path = self._create_table_with_values_of_type(col_type, vector,
unique_database, no_nan_tbl, "(1.5), (2.3), (4.5), (42.42), (3.1415), (0.0)")
self._validate_parquet_page_index(no_nan_hdfs_path, tmpdir.join(no_nan_tbl))
row_group_indexes = self._get_row_groups_from_hdfs_folder(no_nan_hdfs_path,
tmpdir.join(no_nan_tbl))
column = row_group_indexes[0][0]
assert column.column_index is not None
# Table contains NaN as first value.
first_nan_tbl = "first_nan_tbl_" + col_type
first_nan_hdfs_path = self._create_table_with_values_of_type(col_type, vector,
unique_database, first_nan_tbl,
nan_val + ", (2.3), (4.5), (42.42), (3.1415), (0.0)")
row_group_indexes = self._get_row_groups_from_hdfs_folder(first_nan_hdfs_path,
tmpdir.join(first_nan_tbl))
column = row_group_indexes[0][0]
assert column.column_index is None
# Table contains NaN as last value.
last_nan_tbl = "last_nan_tbl_" + col_type
last_nan_hdfs_path = self._create_table_with_values_of_type(col_type, vector,
unique_database, last_nan_tbl,
"(1.5), (2.3), (42.42), (3.1415), (0.0), " + nan_val)
row_group_indexes = self._get_row_groups_from_hdfs_folder(last_nan_hdfs_path,
tmpdir.join(last_nan_tbl))
column = row_group_indexes[0][0]
assert column.column_index is None
# Table contains NaN value in the middle.
mid_nan_tbl = "mid_nan_tbl_" + col_type
mid_nan_hdfs_path = self._create_table_with_values_of_type(col_type, vector,
unique_database, mid_nan_tbl,
"(2.3), (4.5), " + nan_val + ", (42.42), (3.1415), (0.0)")
row_group_indexes = self._get_row_groups_from_hdfs_folder(mid_nan_hdfs_path,
tmpdir.join(mid_nan_tbl))
column = row_group_indexes[0][0]
assert column.column_index is None
# Table contains only NaN values.
only_nan_tbl = "only_nan_tbl_" + col_type
only_nan_hdfs_path = self._create_table_with_values_of_type(col_type, vector,
unique_database, only_nan_tbl, (nan_val + ", ") * 3 + nan_val)
row_group_indexes = self._get_row_groups_from_hdfs_folder(only_nan_hdfs_path,
tmpdir.join(only_nan_tbl))
column = row_group_indexes[0][0]
assert column.column_index is None