blob: a331901bc5db0d71b7400bb65112ced66ca7f376 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Targeted tests for Impala joins
#
import pytest
from copy import deepcopy
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.skip import (
SkipIf,
SkipIfIsilon,
SkipIfLocal,
SkipIfS3,
SkipIfABFS,
SkipIfADLS)
from tests.common.test_vector import ImpalaTestDimension
class TestJoinQueries(ImpalaTestSuite):
BATCH_SIZES = [0, 1]
@classmethod
def get_workload(cls):
return 'functional-query'
@classmethod
def add_test_dimensions(cls):
super(TestJoinQueries, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_dimension(
ImpalaTestDimension('batch_size', *TestJoinQueries.BATCH_SIZES))
# TODO: Look into splitting up join tests to accomodate hbase.
# Joins with hbase tables produce drastically different results.
cls.ImpalaTestMatrix.add_constraint(lambda v:\
v.get_value('table_format').file_format in ['parquet'])
if cls.exploration_strategy() != 'exhaustive':
# Cut down on execution time when not running in exhaustive mode.
cls.ImpalaTestMatrix.add_constraint(lambda v: v.get_value('batch_size') != 1)
def test_basic_joins(self, vector):
new_vector = deepcopy(vector)
new_vector.get_value('exec_option')['batch_size'] = vector.get_value('batch_size')
self.run_test_case('QueryTest/joins', new_vector)
def test_single_node_joins_with_limits_exhaustive(self, vector):
if self.exploration_strategy() != 'exhaustive': pytest.skip()
new_vector = deepcopy(vector)
new_vector.get_value('exec_option')['num_nodes'] = 1
del new_vector.get_value('exec_option')['batch_size'] # .test file sets batch_size
self.run_test_case('QueryTest/single-node-joins-with-limits-exhaustive', new_vector)
@SkipIfS3.hbase
@SkipIfABFS.hbase
@SkipIfADLS.hbase
@SkipIfIsilon.hbase
@SkipIf.skip_hbase
@SkipIfLocal.hbase
def test_joins_against_hbase(self, vector):
new_vector = deepcopy(vector)
new_vector.get_value('exec_option')['batch_size'] = vector.get_value('batch_size')
self.run_test_case('QueryTest/joins-against-hbase', new_vector)
def test_outer_joins(self, vector):
new_vector = deepcopy(vector)
new_vector.get_value('exec_option')['batch_size'] = vector.get_value('batch_size')
self.run_test_case('QueryTest/outer-joins', new_vector)
def test_single_node_nested_loop_joins(self, vector):
# Test the execution of nested-loops joins for join types that can only be
# executed in a single node (right [outer|semi|anti] and full outer joins).
new_vector = deepcopy(vector)
new_vector.get_value('exec_option')['num_nodes'] = 1
self.run_test_case('QueryTest/single-node-nlj', new_vector)
def test_single_node_nested_loop_joins_exhaustive(self, vector):
if self.exploration_strategy() != 'exhaustive': pytest.skip()
new_vector = deepcopy(vector)
new_vector.get_value('exec_option')['num_nodes'] = 1
self.run_test_case('QueryTest/single-node-nlj-exhaustive', new_vector)
def test_empty_build_joins(self, vector):
new_vector = deepcopy(vector)
new_vector.get_value('exec_option')['batch_size'] = vector.get_value('batch_size')
self.run_test_case('QueryTest/empty-build-joins', new_vector)
class TestTPCHJoinQueries(ImpalaTestSuite):
# Uses the TPC-H dataset in order to have larger joins. Needed for example to test
# the repartitioning codepaths.
@classmethod
def get_workload(cls):
return 'tpch'
@classmethod
def add_test_dimensions(cls):
super(TestTPCHJoinQueries, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_dimension(
ImpalaTestDimension('batch_size', *TestJoinQueries.BATCH_SIZES))
cls.ImpalaTestMatrix.add_constraint(lambda v:\
v.get_value('table_format').file_format in ['parquet'])
if cls.exploration_strategy() != 'exhaustive':
# Cut down on execution time when not running in exhaustive mode.
cls.ImpalaTestMatrix.add_constraint(lambda v: v.get_value('batch_size') != 1)
@classmethod
def teardown_class(cls):
cls.client.execute('set mem_limit = 0');
super(TestTPCHJoinQueries, cls).teardown_class()
def test_outer_joins(self, vector):
new_vector = deepcopy(vector)
new_vector.get_value('exec_option')['batch_size'] = vector.get_value('batch_size')
self.run_test_case('tpch-outer-joins', new_vector)
class TestSemiJoinQueries(ImpalaTestSuite):
@classmethod
def get_workload(cls):
return 'functional-query'
@classmethod
def add_test_dimensions(cls):
super(TestSemiJoinQueries, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_dimension(
ImpalaTestDimension('batch_size', *TestJoinQueries.BATCH_SIZES))
# Joins with hbase tables produce drastically different results.
cls.ImpalaTestMatrix.add_constraint(lambda v:\
v.get_value('table_format').file_format in ['parquet'])
if cls.exploration_strategy() != 'exhaustive':
# Cut down on execution time when not running in exhaustive mode.
cls.ImpalaTestMatrix.add_constraint(lambda v: v.get_value('batch_size') != 1)
def __load_semi_join_tables(self, db_name):
# Create and load fresh test tables for semi/anti-join tests
fq_tbl_name_a = '%s.SemiJoinTblA' % db_name
self.client.execute('create table %s (a int, b int, c int)' % fq_tbl_name_a)
self.client.execute('insert into %s values(1,1,1)' % fq_tbl_name_a);
self.client.execute('insert into %s values(1,1,10)' % fq_tbl_name_a);
self.client.execute('insert into %s values(1,2,10)' % fq_tbl_name_a);
self.client.execute('insert into %s values(1,3,10)' % fq_tbl_name_a);
self.client.execute('insert into %s values(NULL,NULL,30)' % fq_tbl_name_a);
self.client.execute('insert into %s values(2,4,30)' % fq_tbl_name_a);
self.client.execute('insert into %s values(2,NULL,20)' % fq_tbl_name_a);
fq_tbl_name_b = '%s.SemiJoinTblB' % db_name
self.client.execute('create table %s (a int, b int, c int)' % fq_tbl_name_b)
self.client.execute('insert into %s values(1,1,1)' % fq_tbl_name_b);
self.client.execute('insert into %s values(1,1,10)' % fq_tbl_name_b);
self.client.execute('insert into %s values(1,2,5)' % fq_tbl_name_b);
self.client.execute('insert into %s values(1,NULL,10)' % fq_tbl_name_b);
self.client.execute('insert into %s values(2,10,NULL)' % fq_tbl_name_b);
self.client.execute('insert into %s values(3,NULL,NULL)' % fq_tbl_name_b);
self.client.execute('insert into %s values(3,NULL,50)' % fq_tbl_name_b);
def test_semi_joins(self, vector, unique_database):
new_vector = deepcopy(vector)
new_vector.get_value('exec_option')['batch_size'] = vector.get_value('batch_size')
self.__load_semi_join_tables(unique_database)
self.run_test_case('QueryTest/semi-joins', new_vector, unique_database)
@pytest.mark.execute_serially
def test_semi_joins_exhaustive(self, vector):
"""Expensive and memory-intensive semi-join tests."""
if self.exploration_strategy() != 'exhaustive': pytest.skip()
self.run_test_case('QueryTest/semi-joins-exhaustive', vector)