blob: 2c92c72ad940fc0969911c158f4f365b13253933 [file]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Functional tests for LOAD DATA statements.
import time
from copy import deepcopy
from tests.common.impala_connection import FINISHED, PENDING, RUNNING
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.test_dimensions import (
create_client_protocol_dimension,
create_exec_option_dimension,
create_single_exec_option_dimension,
create_uncompressed_text_dimension)
from tests.common.skip import SkipIfLocal
from tests.common.test_vector import ImpalaTestDimension
from tests.util.filesystem_utils import get_fs_path, WAREHOUSE, IS_HDFS
TEST_TBL_PART = "test_load"
TEST_TBL_NOPART = "test_load_nopart"
TEST_TBL_NOPART_EXT = "test_load_nopart_ext"
STAGING_PATH = '%s/test_load_staging' % WAREHOUSE
ALLTYPES_PATH = "%s/alltypes/year=2010/month=1/100101.txt" % WAREHOUSE
MULTIAGG_PATH = '%s/alltypesaggmultifiles/year=2010/month=1/day=1' % WAREHOUSE
HIDDEN_FILES = ["{0}/3/.100101.txt".format(STAGING_PATH),
"{0}/3/_100101.txt".format(STAGING_PATH)]
# A path outside WAREHOUSE, which will be a different bucket for Ozone/ofs.
TMP_STAGING_PATH = get_fs_path('/tmp/test_load_staging')
@SkipIfLocal.hdfs_client
class TestLoadData(ImpalaTestSuite):
@classmethod
def add_test_dimensions(cls):
super(TestLoadData, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
cls.ImpalaTestMatrix.add_dimension(
create_uncompressed_text_dimension(cls.get_workload()))
def _clean_test_tables(self):
self.client.execute("drop table if exists functional.{0}".format(TEST_TBL_NOPART))
self.client.execute("drop table if exists functional.{0}".format(TEST_TBL_PART))
self.filesystem_client.delete_file_dir(STAGING_PATH, recursive=True)
def teardown_method(self, method):
self._clean_test_tables()
super().teardown_method(method)
def setup_method(self, method):
super().setup_method(method)
# Defensively clean the data dirs if they exist.
self._clean_test_tables()
# Create staging directories for load data inpath. The staging directory is laid out
# as follows:
# - It has 6 sub directories, numbered 1-6
# - The directories are populated with files from a subset of partitions in
# existing partitioned tables.
# - Sub Directories 1-4 have single files copied from alltypes/
# - Sub Directories 5-6 have multiple files (4) copied from alltypesaggmultifiles
# - Sub Directory 3 also has hidden files, in both supported formats.
# - All sub-dirs contain a hidden directory
for i in range(1, 6):
stagingDir = '{0}/{1}'.format(STAGING_PATH, i)
self.filesystem_client.make_dir(stagingDir, permission=777)
self.filesystem_client.make_dir('{0}/_hidden_dir'.format(stagingDir),
permission=777)
# Copy single file partitions from alltypes.
for i in range(1, 4):
self.filesystem_client.copy(ALLTYPES_PATH,
"{0}/{1}/100101.txt".format(STAGING_PATH, i))
# Copy multi file partitions from alltypesaggmultifiles.
file_names = self.filesystem_client.ls(MULTIAGG_PATH)
for i in range(4, 6):
for file_ in file_names:
self.filesystem_client.copy(
"{0}/{1}".format(MULTIAGG_PATH, file_),
'{0}/{1}/{2}'.format(STAGING_PATH, i, file_))
# Create two hidden files, with a leading . and _
for file_ in HIDDEN_FILES:
self.filesystem_client.copy(ALLTYPES_PATH, file_)
# Create both the test tables.
self.client.execute("create table functional.{0} like functional.alltypes"
" location '{1}/{0}'".format(TEST_TBL_PART, WAREHOUSE))
self.client.execute("create table functional.{0} like functional.alltypesnopart"
" location '{1}/{0}'".format(TEST_TBL_NOPART, WAREHOUSE))
def test_load(self, vector):
self.run_test_case('QueryTest/load', vector)
# The hidden files should not have been moved as part of the load operation.
for file_ in HIDDEN_FILES:
assert self.filesystem_client.exists(file_), "{0} does not exist".format(file_)
@SkipIfLocal.hdfs_client
class TestLoadDataExternal(ImpalaTestSuite):
def _clean_test_tables(self):
self.client.execute("drop table if exists functional.{0}".format(TEST_TBL_NOPART_EXT))
self.filesystem_client.delete_file_dir(TMP_STAGING_PATH, recursive=True)
def teardown_method(self, method):
self._clean_test_tables()
super().teardown_method(method)
def setup_method(self, method):
super().setup_method(method)
# Defensively clean the data dirs if they exist.
self._clean_test_tables()
self.filesystem_client.make_dir(TMP_STAGING_PATH)
self.filesystem_client.copy(ALLTYPES_PATH, "{0}/100101.txt".format(TMP_STAGING_PATH))
self.client.execute("create table functional.{0} like functional.alltypesnopart"
" location '{1}/{0}'".format(TEST_TBL_NOPART_EXT, WAREHOUSE))
def test_load(self):
self.execute_query_expect_success(self.client, "load data inpath '{0}/100101.txt'"
" into table functional.{1}".format(TMP_STAGING_PATH, TEST_TBL_NOPART_EXT))
result = self.execute_scalar(
"select count(*) from functional.{0}".format(TEST_TBL_NOPART_EXT))
assert(result == '310')
@SkipIfLocal.hdfs_client
class TestAsyncLoadData(ImpalaTestSuite):
@classmethod
def add_test_dimensions(cls):
super(TestAsyncLoadData, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_dimension(
create_uncompressed_text_dimension(cls.get_workload()))
# Test all clients: hs2, hs2-http and beeswax
cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension())
# Test two exec modes per client
cls.ImpalaTestMatrix.add_dimension(
ImpalaTestDimension('enable_async_load_data_execution', True, False))
# Disable codegen = false
cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(
disable_codegen_options=[False]))
# This test subjects the load into either sync or async compilation of the load
# query at the backend through beewax or hs2 clients. The objective is to assure
# the load query completes successfully.
def test_async_load(self, vector, unique_database):
enable_async_load_data = vector.get_value('enable_async_load_data_execution')
protocol = vector.get_value('protocol')
client = self.create_impala_client(protocol=protocol)
# Form a fully qualified table name with '-' in protocol 'hs2-http' dropped as
# '-' is not allowed in Impala table name even delimited with ``.
qualified_table_name = '{0}.{1}_{2}_{3}'.format(unique_database, TEST_TBL_NOPART,
protocol if protocol != 'hs2-http' else 'hs2http', enable_async_load_data)
# Form a staging path that is protocol and enable_async_load_data dependent to
# allow parallel creating distinct HDFS directories for each test object.
staging_path = "{0}_{1}_{2}".format(STAGING_PATH, protocol, enable_async_load_data)
# Put some data into the staging path
self.filesystem_client.delete_file_dir(staging_path, recursive=True)
self.filesystem_client.make_dir(staging_path, permission=777)
self.filesystem_client.copy(ALLTYPES_PATH, "{0}/100101.txt".format(staging_path))
# Create a table with the staging path
self.client.execute("create table {0} like functional.alltypesnopart \
location \'{1}\'".format(qualified_table_name, staging_path))
try:
# The load data is going to need the metadata of the table. To avoid flakiness
# about metadata loading, this selects from the table first to get the metadata
# loaded.
self.execute_query_expect_success(client,
"select count(*) from {0}".format(qualified_table_name))
# Configure whether to use async LOAD and add an appropriate delay of 3 seconds
new_vector = deepcopy(vector)
new_vector.get_value('exec_option')['enable_async_load_data_execution'] = \
enable_async_load_data
delay = "CRS_DELAY_BEFORE_LOAD_DATA:SLEEP@3000"
new_vector.get_value('exec_option')['debug_action'] = "{0}".format(delay)
load_stmt = "load data inpath \'{1}\' \
into table {0}".format(qualified_table_name, staging_path)
exec_start = time.time()
handle = self.execute_query_async_using_client(client, load_stmt, new_vector)
exec_end = time.time()
exec_time = exec_end - exec_start
exec_end_state = client.get_impala_exec_state(handle)
# Wait for the statement to finish with a timeout of 20 seconds
# (30 seconds without shortcircuit reads)
wait_time = 20 if IS_HDFS else 30
wait_start = time.time()
client.wait_for_impala_state(handle, FINISHED, wait_time)
wait_end = time.time()
wait_time = wait_end - wait_start
self.close_query_using_client(client, handle)
if enable_async_load_data:
# In async mode:
# The compilation of LOAD is processed in the exec step without delay. And the
# processing of the LOAD plan is in wait step with delay. Relax the wait time
# to at least 2 seconds because wait_start not strictly starts at the point
# when the async_exec_thread_ starts.
assert (exec_end_state == PENDING or exec_end_state == RUNNING)
assert (wait_time > 2)
else:
# In sync mode:
# The entire LOAD is processed in the exec step with delay. exec_time should be
# more than 3 seconds. Since the load query is submitted async, it is possible
# that the exec state returned is still in RUNNING state due to the the wait-for
# thread executing ClientRequestState::Wait() does not have time to set the
# exec state from RUNNING to FINISH.
assert (exec_end_state == RUNNING or exec_end_state == FINISHED)
assert (exec_time >= 3)
finally:
client.close()
self.client.execute("drop table if exists {0}".format(qualified_table_name))
self.filesystem_client.delete_file_dir(staging_path, recursive=True)