blob: c4e31e933f15c52b0cd3406dac1f5c8a2bd5dc01 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Common test dimensions and associated utility functions.
import copy
import os
from itertools import product
from tests.common.test_vector import ImpalaTestDimension, ImpalaTestVector
# Describes the configuration used to execute a single tests. Contains both the details
# of what specific table format to target along with the exec options (num_nodes, etc)
# to use when running the query.
class TableFormatInfo(object):
KNOWN_FILE_FORMATS = ['text', 'seq', 'rc', 'parquet', 'orc', 'avro', 'hbase']
if os.environ['KUDU_IS_SUPPORTED'] == 'true':
KNOWN_COMPRESSION_CODECS = ['none', 'snap', 'gzip', 'bzip', 'def', 'lzo', 'zstd', 'lz4']
KNOWN_COMPRESSION_TYPES = ['none', 'block', 'record']
def __init__(self, **kwargs):
self.dataset = kwargs.get('dataset', 'UNKNOWN')
self.file_format = kwargs.get('file_format', 'text')
self.compression_codec = kwargs.get('compression_codec', 'none')
self.compression_type = kwargs.get('compression_type', 'none')
def __validate(self):
if self.file_format not in TableFormatInfo.KNOWN_FILE_FORMATS:
raise ValueError, 'Unknown file format: %s' % self.file_format
if self.compression_codec not in TableFormatInfo.KNOWN_COMPRESSION_CODECS:
raise ValueError, 'Unknown compression codec: %s' % self.compression_codec
if self.compression_type not in TableFormatInfo.KNOWN_COMPRESSION_TYPES:
raise ValueError, 'Unknown compression type: %s' % self.compression_type
if (self.compression_codec == 'none' or self.compression_type == 'none') and\
self.compression_codec != self.compression_type:
raise ValueError, 'Invalid combination of compression codec/type: %s' % str(self)
def create_from_string(dataset, table_format_string):
Parses a table format string and creates a table format info object from the string
Expected input is file_format/compression_codec/[compression_type]. The
compression_type is optional, defaulting to 'block' if the table is compressed
or 'none' if the table is uncompressed.
if table_format_string is None:
raise ValueError, 'Table format string cannot be None'
format_parts = table_format_string.strip().split('/')
if len(format_parts) not in range(2, 4):
raise ValueError, 'Invalid table format %s' % table_format_string
file_format, compression_codec = format_parts[:2]
if len(format_parts) == 3:
compression_type = format_parts[2]
# Assume the default compression type is block (of the table is compressed)
compression_type = 'none' if compression_codec == 'none' else 'block'
return TableFormatInfo(dataset=dataset, file_format=file_format,
def __str__(self):
compression_str = '%s/%s' % (self.compression_codec, self.compression_type)
if self.compression_codec == 'none' and self.compression_type == 'none':
compression_str = 'none'
return '%s/%s' % (self.file_format, compression_str)
def db_suffix(self):
if self.file_format == 'text' and self.compression_codec == 'none':
return ''
elif self.compression_codec == 'none':
return '_%s' % (self.file_format)
elif self.compression_type == 'record':
return '_%s_record_%s' % (self.file_format, self.compression_codec)
return '_%s_%s' % (self.file_format, self.compression_codec)
def create_uncompressed_text_dimension(workload):
dataset = get_dataset_from_workload(workload)
return ImpalaTestDimension('table_format',
TableFormatInfo.create_from_string(dataset, 'text/none'))
def create_parquet_dimension(workload):
dataset = get_dataset_from_workload(workload)
return ImpalaTestDimension('table_format',
TableFormatInfo.create_from_string(dataset, 'parquet/none'))
def create_avro_snappy_dimension(workload):
dataset = get_dataset_from_workload(workload)
return ImpalaTestDimension('table_format',
TableFormatInfo.create_from_string(dataset, 'avro/snap/block'))
def create_client_protocol_dimension():
# IMPALA-8864: Older python versions do not support SSLContext object that the thrift
# http client implementation depends on. Falls back to a dimension without http
# transport.
import ssl
if not hasattr(ssl, "create_default_context"):
return ImpalaTestDimension('protocol', 'beeswax', 'hs2')
return ImpalaTestDimension('protocol', 'beeswax', 'hs2', 'hs2-http')
def hs2_parquet_constraint(v):
"""Constraint function, used to only run HS2 against Parquet format, because file format
and the client protocol are orthogonal."""
return (v.get_value('protocol') == 'beeswax' or
v.get_value('table_format').file_format == 'parquet' and
v.get_value('table_format').compression_codec == 'none')
def hs2_text_constraint(v):
"""Constraint function, used to only run HS2 against uncompressed text, because file
format and the client protocol are orthogonal."""
return (v.get_value('protocol') == 'beeswax' or
v.get_value('table_format').file_format == 'text' and
v.get_value('table_format').compression_codec == 'none')
# Common sets of values for the exec option vectors
# Don't run with NUM_NODES=1 due to IMPALA-561
def create_single_exec_option_dimension(num_nodes=0, disable_codegen_rows_threshold=5000):
"""Creates an exec_option dimension that will produce a single test vector"""
return create_exec_option_dimension(cluster_sizes=[num_nodes],
# Make sure codegen kicks in for functional.alltypes.
def create_exec_option_dimension(cluster_sizes=ALL_CLUSTER_SIZES,
sync_ddl=None, exec_single_node_option=[0],
# We already run with codegen on and off explicitly -
# don't need automatic toggling.
exec_option_dimensions = {
'abort_on_error': [1],
'exec_single_node_rows_threshold': exec_single_node_option,
'batch_size': batch_sizes,
'disable_codegen': disable_codegen_options,
'disable_codegen_rows_threshold': disable_codegen_rows_threshold_options,
'num_nodes': cluster_sizes}
if sync_ddl is not None:
exec_option_dimensions['sync_ddl'] = sync_ddl
if debug_action_options is not None:
exec_option_dimensions['debug_action'] = debug_action_options
return create_exec_option_dimension_from_dict(exec_option_dimensions)
def create_exec_option_dimension_from_dict(exec_option_dimensions):
Builds a query exec option test dimension
Exhaustively goes through all combinations of the given query option values.
For each combination create an exec option dictionary and add it as a value in the
exec option test dimension. Each dictionary can then be passed via Beeswax to control
Impala query execution behavior.
TODO: In the future we could generate these values using pairwise to reduce total
execution time.
# Generate the cross product (all combinations) of the exec options specified. Then
# store them in exec_option dictionary format.
keys = sorted(exec_option_dimensions)
combinations = product(*(exec_option_dimensions[name] for name in keys))
exec_option_dimension_values = [dict(zip(keys, prod)) for prod in combinations]
# Build a test vector out of it
return ImpalaTestDimension('exec_option', *exec_option_dimension_values)
def add_exec_option_dimension(test_suite, key, value):
Takes an ImpalaTestSuite object 'test_suite' and adds 'key=value' to every exec option
test dimension, leaving the number of tests that will be run unchanged.
for v in test_suite.ImpalaTestMatrix.dimensions["exec_option"]:
v.value[key] = value
def extend_exec_option_dimension(test_suite, key, value):
Takes an ImpalaTestSuite object 'test_suite' and extends the exec option test dimension
by creating a copy of each existing exec option value that has 'key' set to 'value',
doubling the number of tests that will be run.
dim = test_suite.ImpalaTestMatrix.dimensions["exec_option"]
new_value = []
for v in dim:
new_value.append(ImpalaTestVector.Value(, copy.copy(v.value)))
new_value[-1].value[key] = value
def get_dataset_from_workload(workload):
# TODO: We need a better way to define the workload -> dataset mapping so we can
# extract it without reading the actual test vector file
return load_table_info_dimension(workload, 'exhaustive')[0].value.dataset
def load_table_info_dimension(workload_name, exploration_strategy, file_formats=None,
"""Loads test vector corresponding to the given workload and exploration strategy"""
test_vector_file = os.path.join(
WORKLOAD_DIR, workload_name, '%s_%s.csv' % (workload_name, exploration_strategy))
if not os.path.isfile(test_vector_file):
raise RuntimeError, 'Vector file not found: ' + test_vector_file
vector_values = []
with open(test_vector_file, 'rb') as vector_file:
for line in vector_file.readlines():
if line.strip().startswith('#'):
# Extract each test vector and add them to a dictionary
vals = dict((key.strip(), value.strip()) for key, value in\
(item.split(':') for item in line.split(',')))
# Skip Kudu if Kudu is not supported (IMPALA-4287).
if os.environ['KUDU_IS_SUPPORTED'] != 'true' and vals['file_format'] == 'kudu':
# If only loading specific file formats skip anything that doesn't match
if file_formats is not None and vals['file_format'] not in file_formats:
if compression_codecs is not None and\
vals['compression_codec'] not in compression_codecs:
return ImpalaTestDimension('table_format', *vector_values)
def is_supported_insert_format(table_format):
# Returns true if the given table_format is a supported Impala INSERT format
return table_format.compression_codec == 'none' and\
table_format.file_format in ['text', 'parquet']