blob: 35f380e310819e5382ad584aefb4e24f60ccc43a [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import, division, print_function
from builtins import map, range
import pytest
import random
import time
from multiprocessing import Value
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.parametrize import UniqueDatabase
from tests.common.test_vector import ImpalaTestDimension
from tests.stress.stress_util import run_tasks, Task
from tests.util.filesystem_utils import IS_HDFS
# Longer-running UPDATE tests are executed here
class TestIcebergV2UpdateStress(ImpalaTestSuite):
"""UPDATE tests against Iceberg V2 tables."""
BATCH_SIZES = [0, 32]
EXHAUSTIVE_BATCH_SIZES = [0, 1, 11, 32]
@classmethod
def get_workload(cls):
return 'functional-query'
@classmethod
def add_test_dimensions(cls):
super(TestIcebergV2UpdateStress, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_constraint(
lambda v: v.get_value('table_format').file_format == 'parquet')
if cls.exploration_strategy() == 'core':
cls.ImpalaTestMatrix.add_dimension(
ImpalaTestDimension('batch_size', *TestIcebergV2UpdateStress.BATCH_SIZES))
else:
cls.ImpalaTestMatrix.add_dimension(
ImpalaTestDimension('batch_size',
*TestIcebergV2UpdateStress.EXHAUSTIVE_BATCH_SIZES))
def test_update_stress(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-update-stress', vector,
unique_database)
if IS_HDFS:
self._update_stress_hive_tests(unique_database)
def _update_stress_hive_tests(self, db):
stmt = """
SELECT count(*), sum(ss_ticket_number)
FROM {}.ice_store_sales
WHERE ss_item_sk % 1999 = 0""".format(db)
hive_results = self.run_stmt_in_hive(stmt).split("\n", 1)[1]
assert hive_results == \
"3138,848464922\n"
# Stress test for concurrent UPDATE operations against Iceberg tables.
class TestIcebergConcurrentUpdateStress(ImpalaTestSuite):
@classmethod
def get_workload(self):
return 'targeted-stress'
@classmethod
def add_test_dimensions(cls):
super(TestIcebergConcurrentUpdateStress, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_constraint(
lambda v: (v.get_value('table_format').file_format == 'parquet'
and v.get_value('table_format').compression_codec == 'snappy'))
def _impala_role_concurrent_writer(self, tbl_name, col, num_updates):
"""Increments values in column 'total' and in the column which is passed in 'col'."""
target_impalad = random.randint(0, ImpalaTestSuite.get_impalad_cluster_size() - 1)
impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad)
update_cnt = 0
while update_cnt < num_updates:
try:
impalad_client.execute(
"update {0} set total = total + 1, {1} = {1} + 1".format(tbl_name, col))
update_cnt += 1
# Sleep after a succesful operation.
time.sleep(random.random())
except Exception:
# Exceptions are expected due to concurrent operations.
pass
impalad_client.close()
def _impala_role_concurrent_checker(self, tbl_name, target_total):
"""Checks if the table's invariant is true. The invariant is that the equation
'total == a + b + c' is true. Returns 'total'."""
def verify_result_set(result):
assert len(result.data) == 1
line = result.data[0]
[total, a, b, c] = list(map(int, (line.split('\t'))))
assert total == a + b + c
return total
target_impalad = random.randint(0, ImpalaTestSuite.get_impalad_cluster_size() - 1)
impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad)
total = 0
while total < target_total:
result = impalad_client.execute("select * from %s" % tbl_name)
new_total = verify_result_set(result)
assert total <= new_total
total = new_total
time.sleep(random.random())
impalad_client.close()
@pytest.mark.stress
@UniqueDatabase.parametrize(sync_ddl=True)
def test_iceberg_updates(self, unique_database):
"""Issues UPDATE statements against multiple impalads in a way that some
invariants must be true when a spectator process inspects the table. E.g.
the value of a column should be equal to the sum of other columns."""
tbl_name = "%s.test_concurrent_updates" % unique_database
self.client.set_configuration_option("SYNC_DDL", "true")
self.client.execute("""create table {0}
(total bigint, a bigint, b bigint, c bigint)
stored as iceberg
tblproperties('format-version'='2')""".format(tbl_name,))
self.client.execute(
"insert into {} values (0, 0, 0, 0)".format(tbl_name))
num_checkers = 2
cols = 3
updates_per_col = 30
target_total = updates_per_col * cols
updater_a = Task(self._impala_role_concurrent_writer, tbl_name, "a", updates_per_col)
updater_b = Task(self._impala_role_concurrent_writer, tbl_name, "b", updates_per_col)
updater_c = Task(self._impala_role_concurrent_writer, tbl_name, "c", updates_per_col)
checkers = [Task(self._impala_role_concurrent_checker, tbl_name, target_total)
for i in range(0, num_checkers)]
run_tasks([updater_a, updater_b, updater_c] + checkers)
class TestIcebergConcurrentOperations(ImpalaTestSuite):
"""This test checks that concurrent DELETE and UPDATE operations leave the table
in a consistent state."""
@classmethod
def get_workload(self):
return 'targeted-stress'
@classmethod
def add_test_dimensions(cls):
super(TestIcebergConcurrentOperations, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_constraint(
lambda v: (v.get_value('table_format').file_format == 'parquet'
and v.get_value('table_format').compression_codec == 'snappy'))
def _impala_role_concurrent_deleter(self, tbl_name, all_rows_deleted, num_rows):
"""Deletes every row from the table one by one."""
target_impalad = random.randint(0, ImpalaTestSuite.get_impalad_cluster_size() - 1)
impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad)
impalad_client.set_configuration_option("SYNC_DDL", "true")
i = 0
while i < num_rows:
try:
impalad_client.execute(
"delete from {0} WHERE id = {1}".format(tbl_name, i))
i += 1
except Exception as e:
# Exceptions are expected due to concurrent operations.
print(str(e))
time.sleep(random.random())
all_rows_deleted.value = 1
impalad_client.close()
def _impala_role_concurrent_writer(self, tbl_name, all_rows_deleted):
"""Updates every row in the table in a loop."""
target_impalad = random.randint(0, ImpalaTestSuite.get_impalad_cluster_size() - 1)
impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad)
impalad_client.set_configuration_option("SYNC_DDL", "true")
while all_rows_deleted.value != 1:
try:
impalad_client.execute(
"update {0} set j = j + 1".format(tbl_name))
except Exception as e:
# Exceptions are expected due to concurrent operations.
print(str(e))
time.sleep(random.random())
impalad_client.close()
def _impala_role_concurrent_optimizer(self, tbl_name, all_rows_deleted):
"""Optimizes the table in a loop."""
target_impalad = random.randint(0, ImpalaTestSuite.get_impalad_cluster_size() - 1)
impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad)
impalad_client.set_configuration_option("SYNC_DDL", "true")
while all_rows_deleted.value != 1:
try:
impalad_client.execute("optimize table {0}".format(tbl_name))
except Exception as e:
# Exceptions are expected due to concurrent operations.
print(str(e))
time.sleep(random.random())
impalad_client.close()
def _impala_role_concurrent_checker(self, tbl_name, all_rows_deleted, num_rows):
"""Checks if the table's invariant is true. The invariant is that we have a
consecutive range of 'id's starting from N to num_rows - 1. And 'j's are equal."""
def verify_result_set(result):
if len(result.data) == 0: return
line = result.data[0]
[prev_id, prev_j] = list(map(int, (line.split('\t'))))
for line in result.data[1:]:
[id, j] = list(map(int, (line.split('\t'))))
assert id - prev_id == 1
assert j == prev_j
prev_id = id
prev_j = j
assert prev_id == num_rows - 1
target_impalad = random.randint(0, ImpalaTestSuite.get_impalad_cluster_size() - 1)
impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad)
while all_rows_deleted.value != 1:
result = impalad_client.execute("select * from %s order by id" % tbl_name)
verify_result_set(result)
time.sleep(random.random())
impalad_client.close()
@pytest.mark.stress
@UniqueDatabase.parametrize(sync_ddl=True)
def test_iceberg_deletes_and_updates(self, unique_database):
"""Issues DELETE and UPDATE statements in parallel in a way that some
invariants must be true when a spectator process inspects the table."""
tbl_name = "%s.test_concurrent_deletes_and_updates" % unique_database
self.client.set_configuration_option("SYNC_DDL", "true")
self.client.execute("""create table {0} (id int, j bigint)
stored as iceberg
tblproperties('format-version'='2')""".format(tbl_name,))
num_rows = 10
for i in range(num_rows):
self.client.execute("insert into {} values ({}, 0)".format(tbl_name, i))
all_rows_deleted = Value('i', 0)
deleter = Task(self._impala_role_concurrent_deleter, tbl_name, all_rows_deleted,
num_rows)
updater = Task(self._impala_role_concurrent_writer, tbl_name, all_rows_deleted)
checker = Task(self._impala_role_concurrent_checker, tbl_name, all_rows_deleted,
num_rows)
run_tasks([deleter, updater, checker])
result = self.client.execute("select count(*) from {}".format(tbl_name))
assert result.data == ['0']
@pytest.mark.stress
@UniqueDatabase.parametrize(sync_ddl=True)
def test_iceberg_deletes_and_updates_and_optimize(self, unique_database):
"""Issues DELETE and UPDATE statements in parallel in a way that some
invariants must be true when a spectator process inspects the table.
An optimizer thread also invokes OPTMIZE regularly on the table."""
tbl_name = "%s.test_concurrent_write_and_optimize" % unique_database
self.client.set_configuration_option("SYNC_DDL", "true")
self.client.execute("""create table {0} (id int, j bigint)
stored as iceberg
tblproperties('format-version'='2')""".format(tbl_name,))
num_rows = 10
values_str = ""
# Prepare INSERT statement of 'num_rows' records.
for i in range(num_rows):
values_str += "({}, 0)".format(i)
if i != num_rows - 1:
values_str += ", "
self.client.execute("insert into {} values {}".format(tbl_name, values_str))
all_rows_deleted = Value('i', 0)
deleter = Task(self._impala_role_concurrent_deleter, tbl_name, all_rows_deleted,
num_rows)
updater = Task(self._impala_role_concurrent_writer, tbl_name, all_rows_deleted)
optimizer = Task(self._impala_role_concurrent_optimizer, tbl_name,
all_rows_deleted)
checker = Task(self._impala_role_concurrent_checker, tbl_name, all_rows_deleted,
num_rows)
run_tasks([deleter, updater, optimizer, checker])
result = self.client.execute("select count(*) from {}".format(tbl_name))
assert result.data == ['0']