blob: c5576beceff140d618c81abb17980c1c96f8b1ec [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import pytest
import time
import traceback
from multiprocessing.pool import ThreadPool
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.skip import SkipIfHive2
NUM_OVERWRITES = 2
NUM_INSERTS_PER_OVERWRITE = 4
class TestAcidStress(ImpalaTestSuite):
@classmethod
def get_workload(self):
return 'targeted-stress'
@classmethod
def add_test_dimensions(cls):
super(TestAcidStress, cls).add_test_dimensions()
# Could be moved to exhaustive tests due to the long execution time, but this only
# runs with Hive3, where the main goal currently is to make ACID work, so it is better
# to run this frequently.
cls.ImpalaTestMatrix.add_constraint(
lambda v: (v.get_value('table_format').file_format == 'text' and
v.get_value('table_format').compression_codec == 'none'))
def __verify_result(self, result, expected_result):
"""Verify invariants for 'run' and 'i'."""
assert len(result.data) > 0
run_max = -1
i_list = []
for line in result.data:
[run, i] = map(int, (line.split('\t')))
run_max = max(run_max, run)
i_list.append(i)
assert expected_result["run"] <= run_max # shouldn't see data overwritten in the past
i_list.sort()
if expected_result["run"] < run_max:
expected_result["run"] = run_max
expected_result["i"] = 0
return
assert i_list[-1] >= expected_result["i"]
assert i_list == range(i_list[-1] + 1) # 'i' should have all values from 0 to max_i
expected_result["i"] = i_list[-1]
def __hive_role_write_hive_inserts(self, tbl_name, partitioned):
"""INSERT INTO/OVERWRITE a table several times from Hive."""
part_expr = "partition (p=1)" if partitioned else ""
for run in xrange(0, NUM_OVERWRITES):
OVERWRITE_SQL = """insert overwrite table %s %s values (%i, %i)
""" % (tbl_name, part_expr, run, 0)
self.run_stmt_in_hive(OVERWRITE_SQL)
for i in xrange(1, NUM_INSERTS_PER_OVERWRITE + 1):
INSERT_SQL = """insert into table %s %s values (%i, %i)
""" % (tbl_name, part_expr, run, i)
self.run_stmt_in_hive(INSERT_SQL)
def __impala_role_read_hive_inserts(self, tbl_name):
"""SELECT from a table many times until the expected final values are found."""
expected_result = {"run": -1, "i": 0}
accept_empty_table = True
while expected_result["run"] != NUM_OVERWRITES and \
expected_result["i"] != NUM_INSERTS_PER_OVERWRITE - 1:
time.sleep(3) # Hive inserts usually take a few seconds
self.client.execute("refresh %s" % tbl_name)
result = self.client.execute("select run, i from %s" % tbl_name)
if len(result.data) == 0:
assert accept_empty_table
continue
accept_empty_table = False
self.__verify_result(result, expected_result)
def __run_test_read_hive_inserts(self, unique_database, partitioned):
"""Check that Impala can read a single insert only ACID table (over)written by Hive
several times. Consistency can be checked by using incremental values for
overwrites ('run') and inserts ('i').
"""
tbl_name = "%s.test_read_hive_inserts" % unique_database
part_expr = "partitioned by (p int)" if partitioned else ""
CREATE_SQL = """create table %s (run int, i int) %s TBLPROPERTIES (
'transactional_properties' = 'insert_only', 'transactional' = 'true')
""" % (tbl_name, part_expr)
self.client.execute(CREATE_SQL)
def do_role(role):
try:
if role == "hive":
self.__hive_role_write_hive_inserts(tbl_name, partitioned)
else:
self.__impala_role_read_hive_inserts(tbl_name)
except Exception:
traceback.print_exc()
raise
# TODO: CTRL+C can't interrupt the test
pool = ThreadPool(processes=2)
pool.map_async(do_role, ["impala", "hive"]).get(600)
@SkipIfHive2.acid
@pytest.mark.stress
def test_read_hive_inserts_non_partitioned(self, unique_database):
"""Check that Impala can read a non-partitioned ACID table written by Hive."""
self.__run_test_read_hive_inserts(unique_database, False)
@SkipIfHive2.acid
@pytest.mark.stress
def test_read_hive_inserts_partitioned(self, unique_database):
"""Check that Impala can read a partitioned ACID table written by Hive.
Currently only writes a single partition.
"""
self.__run_test_read_hive_inserts(unique_database, True)