| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| # This module contains utility functions for testing Parquet files, |
| # and other functions used for checking for strings in files and |
| # directories. |
| |
| from __future__ import absolute_import, division, print_function |
| import os |
| import re |
| import shutil |
| import tempfile |
| from subprocess import check_call |
| |
| from tests.util.filesystem_utils import get_fs_path, WAREHOUSE_PREFIX |
| from tests.util.iceberg_metadata_util import rewrite_metadata |
| from tests.util.retry import retry |
| |
| |
| def create_iceberg_table_from_directory(impala_client, unique_database, table_name, |
| file_format, table_location="${IMPALA_HOME}/testdata/data/iceberg_test", |
| warehouse_prefix=os.getenv("FILESYSTEM_PREFIX")): |
| """Utility function to create an iceberg table from a directory.""" |
| |
| if not warehouse_prefix and unique_database: |
| warehouse_prefix = os.getenv("DEFAULT_FS", WAREHOUSE_PREFIX) |
| |
| # Only orc and parquet tested/supported |
| assert file_format == "orc" or file_format == "parquet" |
| |
| table_location = os.path.expandvars(table_location) |
| local_dir = os.path.join(table_location, table_name) |
| assert os.path.isdir(local_dir) |
| |
| # Rewrite iceberg metadata to use the warehouse prefix and use unique_database |
| tmp_dir = tempfile.mktemp(table_name) |
| # Need to create the temp dir so 'cp -r' will copy local dir with its original name |
| # under the temp dir. rewrite_metadata() has the assumption that the parent directory |
| # of the 'metadata' directory bears the name of the table. |
| check_call(['mkdir', '-p', tmp_dir]) |
| check_call(['cp', '-r', local_dir, tmp_dir]) |
| local_dir = os.path.join(tmp_dir, table_name) |
| rewrite_metadata(warehouse_prefix, unique_database, os.path.join(local_dir, 'metadata')) |
| |
| # Put the directory in the database's directory (not the table directory) |
| hdfs_parent_dir = os.path.join(get_fs_path("/test-warehouse"), unique_database) |
| hdfs_dir = os.path.join(hdfs_parent_dir, table_name) |
| |
| # Purge existing files if any |
| check_call(['hdfs', 'dfs', '-rm', '-f', '-r', hdfs_dir]) |
| |
| # Note: -d skips a staging copy |
| check_call(['hdfs', 'dfs', '-mkdir', '-p', hdfs_parent_dir]) |
| check_call(['hdfs', 'dfs', '-put', '-d', local_dir, hdfs_parent_dir]) |
| |
| # Create external table |
| qualified_table_name = '{0}.{1}'.format(unique_database, table_name) |
| impala_client.execute("""create external table {0} stored as iceberg location '{1}' |
| tblproperties('write.format.default'='{2}', 'iceberg.catalog'= |
| 'hadoop.tables')""".format(qualified_table_name, hdfs_dir, |
| file_format)) |
| |
| # Automatic clean up after drop table |
| impala_client.execute("""alter table {0} set tblproperties ('external.table.purge'= |
| 'True');""".format(qualified_table_name)) |
| |
| |
| def create_table_from_parquet(impala_client, unique_database, table_name): |
| """Utility function to create a database table from a Parquet file. A Parquet file must |
| exist in $IMPALA_HOME/testdata/data with the name 'table_name'.parquet""" |
| create_table_from_file(impala_client, unique_database, table_name, 'parquet') |
| |
| |
| def create_table_from_orc(impala_client, unique_database, table_name): |
| """Utility function to create a database table from a Orc file. A Orc file must |
| exist in $IMPALA_HOME/testdata/data with the name 'table_name'.orc""" |
| create_table_from_file(impala_client, unique_database, table_name, 'orc') |
| |
| |
| def create_table_from_file(impala_client, unique_database, table_name, file_format): |
| filename = '{0}.{1}'.format(table_name, file_format) |
| local_file = os.path.join(os.environ['IMPALA_HOME'], |
| 'testdata/data/{0}'.format(filename)) |
| assert os.path.isfile(local_file) |
| |
| # Put the file in the database's directory (not the table directory) so it is |
| # available for a LOAD DATA statement. |
| hdfs_file = get_fs_path( |
| os.path.join("/test-warehouse", "{0}.db".format(unique_database), filename)) |
| # Note: -d skips a staging copy |
| check_call(['hdfs', 'dfs', '-put', '-f', '-d', local_file, hdfs_file]) |
| |
| # Create the table and load the file |
| qualified_table_name = '{0}.{1}'.format(unique_database, table_name) |
| impala_client.execute('create table {0} like {1} "{2}" stored as {1}'.format( |
| qualified_table_name, file_format, hdfs_file)) |
| impala_client.execute('load data inpath "{0}" into table {1}'.format( |
| hdfs_file, qualified_table_name)) |
| |
| |
| def create_table_and_copy_files(impala_client, create_stmt, unique_database, table_name, |
| files): |
| # Create the table |
| create_stmt = create_stmt.format(db=unique_database, tbl=table_name) |
| impala_client.execute(create_stmt) |
| |
| hdfs_dir = get_fs_path( |
| os.path.join("/test-warehouse", unique_database + ".db", table_name)) |
| copy_files_to_hdfs_dir(files, hdfs_dir) |
| |
| # Refresh the table metadata to see the new files |
| refresh_stmt = "refresh {0}.{1}".format(unique_database, table_name) |
| impala_client.execute(refresh_stmt) |
| |
| |
| def copy_files_to_hdfs_dir(files, hdfs_dir): |
| # Copy the files |
| # - build a list of source files |
| # - issue a single put to the hdfs_dir ( -d skips a staging copy) |
| source_files = [] |
| for local_file in files: |
| # Cut off leading '/' to make os.path.join() happy |
| local_file = local_file if local_file[0] != '/' else local_file[1:] |
| local_file = os.path.join(os.environ['IMPALA_HOME'], local_file) |
| assert os.path.isfile(local_file) |
| source_files.append(local_file) |
| check_call(['hdfs', 'dfs', '-put', '-f', '-d'] + source_files + [hdfs_dir]) |
| |
| |
| def grep_dir(dir, search, filename_search=""): |
| '''Recursively search for files that contain 'search' and have a filename that matches |
| 'filename_search' and return a list of matched lines grouped by file. |
| ''' |
| filename_matcher = re.compile(filename_search) |
| matching_files = dict() |
| for dir_name, _, file_names in os.walk(dir): |
| for file_name in file_names: |
| file_path = os.path.join(dir_name, file_name) |
| if os.path.islink(file_path) or not filename_matcher.search(file_path): |
| continue |
| with open(file_path) as file: |
| matching_lines = grep_file(file, search) |
| if matching_lines: |
| matching_files[file_name] = matching_lines |
| return matching_files |
| |
| |
| def grep_file(file, search): |
| '''Return lines in 'file' that match the 'search' regex. 'file' must already be |
| opened. |
| ''' |
| matcher = re.compile(search) |
| matching_lines = list() |
| for line in file: |
| if matcher.search(line): |
| matching_lines.append(line) |
| return matching_lines |
| |
| |
| def grep_file_first(file, search): |
| """ |
| Searches for a pattern in a file and returns the first match. If no match is found, |
| returns None. |
| |
| file: An opened file object to search within. |
| search: A string containing the regex pattern to search for. |
| |
| return: The first regex search() return object if found, otherwise None. |
| """ |
| matcher = re.compile(search) |
| for line in file: |
| res = matcher.search(line) |
| if res is not None: |
| return res |
| return None |
| |
| |
| def assert_file_in_dir_contains(dir, search): |
| '''Asserts that at least one file in the 'dir' contains the 'search' term.''' |
| results = grep_dir(dir, search) |
| assert results, "%s should have a file containing '%s' but no file was found" \ |
| % (dir, search) |
| |
| |
| def assert_no_files_in_dir_contain(dir, search): |
| '''Asserts that no files in the 'dir' contains the 'search' term.''' |
| results = grep_dir(dir, search) |
| assert not results, \ |
| "%s should not have any file containing '%s' but a file was found" \ |
| % (dir, search) |
| |
| |
| def make_tmp_test_dir(name): |
| """Create temporary directory with prefix 'impala_test_<name>_'. |
| Return the path of temporary directory as string. Caller is responsible to |
| clean them. If LOG_DIR env var exist, the temporary dir will be placed inside |
| LOG_DIR.""" |
| # TODO: Consider using tempfile.TemporaryDirectory from python3 in the future. |
| parent_dir = os.getenv("LOG_DIR", None) |
| return tempfile.mkdtemp(prefix='impala_test_{}_'.format(name), dir=parent_dir) |
| |
| |
| def cleanup_tmp_test_dir(dir_path): |
| """Remove temporary 'dir_path' and its content. |
| Ignore errors upon deletion.""" |
| shutil.rmtree(dir_path, ignore_errors=True) |
| |
| |
| def count_lines(file_path, missing_ok=False): |
| """Counts the number of lines in the file located at 'file_path'.""" |
| try: |
| with open(file_path, 'rb') as file: |
| return sum(1 for _ in file.readlines()) |
| except IOError: |
| if missing_ok: |
| return 0 |
| raise |
| |
| |
| def file_ends_with_char(file_path, char="\n"): |
| """ |
| Checks if a file ends with a specified character. |
| |
| Args: |
| file_path: Path to the file to check. |
| char: The character to check for at the end of the file (default is newline)/ |
| |
| Returns: |
| bool: True if the file ends with the specified character, False otherwise. |
| |
| Raises: |
| AssertionError: If char is not a single character. |
| """ |
| assert isinstance(char, str) |
| |
| char = char.encode('utf-8') |
| assert len(char) == 1, "char parameter must be a single character, got: {}".format(char) |
| |
| assert os.path.isfile(file_path), "File does not exist: {}".format(file_path) |
| if os.path.getsize(file_path) == 0: |
| return False |
| |
| with open(file_path, 'rb') as f: |
| # Move to the last character of the file |
| f.seek(-1, os.SEEK_END) |
| last_char = f.read(1) |
| # Check if the last character matches the provided character |
| return last_char == char |
| |
| |
| def wait_for_file_line_count(file_path, expected_line_count, max_attempts=3, |
| sleep_time_s=1, backoff=2, last_char="\n", exact_match=False): |
| """ |
| Waits until the given file contains the expected number of lines or until the timeout is |
| reached. Fails an assert if the timeout is reached before the expected number of lines |
| is found. |
| |
| Args: |
| file_path: Path to the file to check. |
| expected_line_count: Expected number of lines in the file. |
| max_attempts: Maximum number of attempts to check the file (default is 3). |
| sleep_time_s: Time to wait between attempts in seconds (default is 1). |
| backoff: Backoff factor for exponential backoff (default is 2). |
| last_char: Optional character that the file should end with (default is newline). If |
| None, the file is not checked for a specific ending character. |
| exact_match: If True, the function will assert that the file has exactly the expected |
| number of lines. If False, it will check if the file has at least the |
| expected number of lines. |
| |
| Raises: |
| AssertionError: If the file does not reach the expected line count within the given |
| number of attempts or if the file does not end with the specified |
| character (if provided). |
| """ |
| def assert_trace_file_lines(): |
| if exact_match: |
| ret = count_lines(file_path) == expected_line_count |
| else: |
| ret = count_lines(file_path) >= expected_line_count |
| |
| if last_char is not None: |
| ret = ret and file_ends_with_char(file_path, last_char) |
| |
| return ret |
| |
| assert retry(assert_trace_file_lines, max_attempts, sleep_time_s, backoff), \ |
| "File '{}' did not reach expected line count of '{}'. actual line count: '{}'" \ |
| .format(file_path, expected_line_count, count_lines(file_path)) |